eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/eris.git |
Log | Files | Refs | Archive | README | LICENSE |
ingest_rir_delegations.py (6392B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # ingest_rir_delegations.py 4 5 import csv 6 import ipaddress 7 import logging 8 import time 9 10 try: 11 import aiohttp 12 except ImportError: 13 raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)') 14 15 16 # Set a default elasticsearch index if one is not provided 17 default_index = 'eris-rir-delegations' 18 19 # Delegation data sources 20 delegation_db = { 21 'afrinic' : 'https://ftp.afrinic.net/stats/afrinic/delegated-afrinic-extended-latest', 22 'apnic' : 'https://ftp.apnic.net/stats/apnic/delegated-apnic-extended-latest', 23 'arin' : 'https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest', 24 'lacnic' : 'https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest', 25 'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest' 26 } 27 28 29 def construct_map() -> dict: 30 '''Construct the Elasticsearch index mapping for records''' 31 32 # Match on exact value or full text search 33 keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } 34 35 # Construct the index mapping 36 mapping = { 37 'mappings': { 38 'properties': { 39 'registry' : { 'type': 'keyword' }, 40 'cc' : { 'type': 'keyword' }, # ISO 3166 2-letter code 41 'asn' : { 42 'properties': { 43 'start' : { 'type': 'integer' }, 44 'end' : { 'type': 'integer' } 45 } 46 }, 47 'ip' : { 48 'properties': { 49 'start' : { 'type': 'ip' }, 50 'end' : { 'type': 'ip' } 51 } 52 }, 53 'date' : { 'type': 'date' }, 54 'status' : { 'type': 'keyword' }, 55 'extensions' : keyword_mapping, 56 'seen' : { 'type': 'date' } 57 } 58 } 59 } 60 61 return mapping 62 63 64 async def process_data(place_holder: str = None): 65 ''' 66 Read and process the delegation data. 67 68 :param place_holder: Placeholder parameter to match the process_data function signature of other ingestors. 69 ''' 70 71 for registry, url in delegation_db.items(): 72 try: 73 headers = {'Connection': 'keep-alive'} # This is required for AIOHTTP connections to LACNIC 74 75 async with aiohttp.ClientSession(headers=headers) as session: 76 async with session.get(url) as response: 77 if response.status != 200: 78 logging.error(f'Failed to fetch {registry} delegation data: {response.status}') 79 continue 80 81 csv_data = await response.text() 82 rows = [line.lower() for line in csv_data.split('\n') if line and not line.startswith('#')] 83 csv_reader = csv.reader(rows, delimiter='|') 84 85 del rows, csv_data # Cleanup 86 87 # Process the CSV data 88 for row in csv_reader: 89 cache = '|'.join(row) # Cache the last row for error handling 90 91 # Heuristic for the header line (not doing anything with it for now) 92 if len(row) == 7 and row[1] != '*': 93 header = { 94 'version' : row[0], 95 'registry' : row[1], 96 'serial' : row[2], 97 'records' : row[3], 98 'startdate' : row[4], 99 'enddate' : row[5], 100 'UTCoffset' : row[6] 101 } 102 continue 103 104 # Heuristic for the summary lines (not doing anything with it for now) 105 elif row[2] != '*' and row[3] == '*': 106 summary = { 107 'registry' : row[0], 108 'type' : row[2], 109 'count' : row[4] 110 } 111 continue 112 113 # Record lines (this is what we want) 114 else: 115 record = { 116 'registry' : row[0], 117 'cc' : row[1], 118 'type' : row[2], 119 'start' : row[3], 120 'value' : row[4], 121 'date' : row[5], 122 'status' : row[6] 123 } 124 125 if len(row) == 7: 126 if row[7]: 127 record['extensions'] = row[7] 128 129 if not record['cc']: 130 del record['cc'] 131 elif len(record['cc']) != 2: 132 raise ValueError(f'Invalid country code: {cache}') 133 134 if not record['value'].isdigit(): 135 raise ValueError(f'Invalid value: {cache}') 136 137 if record['type'] == 'asn': 138 end = int(record['start']) + int(record['value']) - 1 139 record['asn'] = { 'start': int(record['start']), 'end': end } 140 elif record['type'] in ('ipv4', 'ipv6'): 141 try: 142 if record['type'] == 'ipv4': 143 end = ipaddress.ip_address(record['start']) + int(record['value']) - 1 144 elif record['type'] == 'ipv6': 145 end = ipaddress.ip_network(f'{record["start"]}/{record["value"]}').broadcast_address 146 end = end.compressed.lower() 147 record['ip'] = { 'start': record['start'], 'end': str(end) } 148 except ValueError: 149 raise ValueError(f'Invalid IP range: {cache}') 150 else: 151 raise ValueError(f'Invalid record type: {cache}') 152 153 del record['start'], record['value'], record['type'] # Cleanup variables no longer needed 154 155 if not record['date'] or record['date'] == '00000000': 156 del record['date'] 157 else: 158 record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(record['date'], '%Y%m%d')) 159 160 if record['status'] not in ('allocated', 'assigned', 'available', 'reserved', 'unallocated', 'unknown'): 161 raise ValueError(f'Invalid status: {cache}') 162 163 # Set the seen timestamp 164 record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ') 165 166 # Let's just index the records themself (for now) 167 yield {'_index': default_index, '_source': record} 168 169 except Exception as e: 170 logging.error(f'Error processing {registry} delegation data: {e}') 171 172 173 async def test(): 174 '''Test the ingestion process''' 175 176 async for document in process_data(): 177 print(document) 178 179 180 181 if __name__ == '__main__': 182 import asyncio 183 184 asyncio.run(test()) 185 186 187 188 ''' 189 Output: 190 arin|US|ipv4|76.15.132.0|1024|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28 191 arin|US|ipv4|76.15.136.0|2048|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28 192 arin|US|ipv4|76.15.144.0|4096|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28 193 arin|US|ipv4|76.15.160.0|8192|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28 194 195 Input: 196 { 197 'registry' : 'arin', 198 'cc' : 'us', 199 'type' : 'ipv4', 200 'ip' : { 'start': '76.15.132.0', 'end': '76.16.146.0' }, 201 'date' : '2007-05-02T00:00:00Z', 202 'status' : 'allocated', 203 'extensions' : '6c065d5b54b877781f05e7d30ebfff28' 204 } 205 206 Notes: 207 Do we make this handle the database locally or load it into ram? 208 '''