eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_rir_delegations.py (6392B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_rir_delegations.py
      4 
      5 import csv
      6 import ipaddress
      7 import logging
      8 import time
      9 
     10 try:
     11 	import aiohttp
     12 except ImportError:
     13 	raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)')
     14 
     15 
     16 # Set a default elasticsearch index if one is not provided
     17 default_index = 'eris-rir-delegations'
     18 
     19 # Delegation data sources
     20 delegation_db = {
     21 	'afrinic' : 'https://ftp.afrinic.net/stats/afrinic/delegated-afrinic-extended-latest',
     22 	'apnic'   : 'https://ftp.apnic.net/stats/apnic/delegated-apnic-extended-latest',
     23 	'arin'    : 'https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest',
     24 	'lacnic'  : 'https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest',
     25 	'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest'
     26 }
     27 
     28 
     29 def construct_map() -> dict:
     30 	'''Construct the Elasticsearch index mapping for records'''
     31 
     32 	# Match on exact value or full text search
     33 	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     34 
     35 	# Construct the index mapping
     36 	mapping = {
     37 		'mappings': {
     38 			'properties': {
     39 				'registry'   : { 'type': 'keyword' },
     40 				'cc'         : { 'type': 'keyword' }, # ISO 3166 2-letter code
     41 				'asn'        : {
     42         			'properties': {
     43 						'start' : { 'type': 'integer' },
     44 						'end'   : { 'type': 'integer' }
     45 					}
     46 				},
     47 				'ip'         : {
     48 					'properties': {
     49 						'start' : { 'type': 'ip' },
     50 						'end'   : { 'type': 'ip' }
     51 					}
     52 				},
     53 				'date'       : { 'type': 'date'    },
     54 				'status'     : { 'type': 'keyword' },
     55 				'extensions' : keyword_mapping,
     56 				'seen'       : { 'type': 'date' }
     57 			}
     58 		}
     59 	}
     60 
     61 	return mapping
     62 
     63 
     64 async def process_data(place_holder: str = None):
     65 	'''
     66 	Read and process the delegation data.
     67 
     68 	:param place_holder: Placeholder parameter to match the process_data function signature of other ingestors.
     69 	'''
     70 
     71 	for registry, url in delegation_db.items():
     72 		try:
     73 			headers = {'Connection': 'keep-alive'} # This is required for AIOHTTP connections to LACNIC
     74 
     75 			async with aiohttp.ClientSession(headers=headers) as session:
     76 				async with session.get(url) as response:
     77 					if response.status != 200:
     78 						logging.error(f'Failed to fetch {registry} delegation data: {response.status}')
     79 						continue
     80 
     81 					csv_data   = await response.text()
     82 					rows       = [line.lower() for line in csv_data.split('\n') if line and not line.startswith('#')]
     83 					csv_reader = csv.reader(rows, delimiter='|')
     84 
     85 					del rows, csv_data # Cleanup
     86 
     87 					# Process the CSV data
     88 					for row in csv_reader:
     89 						cache = '|'.join(row) # Cache the last row for error handling
     90 
     91 						# Heuristic for the header line (not doing anything with it for now)
     92 						if len(row) == 7 and row[1] != '*':
     93 							header = {
     94 								'version'   : row[0],
     95 								'registry'  : row[1],
     96 								'serial'    : row[2],
     97 								'records'   : row[3],
     98 								'startdate' : row[4],
     99 								'enddate'   : row[5],
    100 								'UTCoffset' : row[6]
    101 							}
    102 							continue
    103 
    104 						# Heuristic for the summary lines (not doing anything with it for now)
    105 						elif row[2] != '*' and row[3] == '*':
    106 							summary = {
    107 								'registry' : row[0],
    108 								'type'     : row[2],
    109 								'count'    : row[4]
    110 							}
    111 							continue
    112 
    113 						# Record lines (this is what we want)
    114 						else:
    115 							record = {
    116 								'registry' : row[0],
    117 								'cc'       : row[1],
    118 								'type'     : row[2],
    119 								'start'    : row[3],
    120 								'value'    : row[4],
    121 								'date'     : row[5],
    122 								'status'   : row[6]
    123 							}
    124 
    125 							if len(row) == 7:
    126 								if row[7]:
    127 									record['extensions'] = row[7]
    128 
    129 							if not record['cc']:
    130 								del record['cc']
    131 							elif len(record['cc']) != 2:
    132 								raise ValueError(f'Invalid country code: {cache}')
    133 
    134 							if not record['value'].isdigit():
    135 								raise ValueError(f'Invalid value: {cache}')
    136 
    137 							if record['type'] == 'asn':
    138 								end = int(record['start']) + int(record['value']) - 1
    139 								record['asn'] = { 'start': int(record['start']), 'end': end }
    140 							elif record['type'] in ('ipv4', 'ipv6'):
    141 								try:
    142 									if record['type'] == 'ipv4':
    143 										end = ipaddress.ip_address(record['start']) + int(record['value']) - 1
    144 									elif record['type'] == 'ipv6':
    145 										end = ipaddress.ip_network(f'{record["start"]}/{record["value"]}').broadcast_address
    146 										end = end.compressed.lower()
    147 									record['ip'] = { 'start': record['start'], 'end': str(end) }
    148 								except ValueError:
    149 									raise ValueError(f'Invalid IP range: {cache}')
    150 							else:
    151 								raise ValueError(f'Invalid record type: {cache}')
    152 
    153 							del record['start'], record['value'], record['type'] # Cleanup variables no longer needed
    154 
    155 							if not record['date'] or record['date'] == '00000000':
    156 								del record['date']
    157 							else:
    158 								record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(record['date'], '%Y%m%d'))
    159 
    160 							if record['status'] not in ('allocated', 'assigned', 'available', 'reserved', 'unallocated', 'unknown'):
    161 								raise ValueError(f'Invalid status: {cache}')
    162 							
    163 							# Set the seen timestamp
    164 							record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ')
    165 
    166 							# Let's just index the records themself (for now)
    167 							yield {'_index': default_index, '_source': record}
    168 
    169 		except Exception as e:
    170 			logging.error(f'Error processing {registry} delegation data: {e}')
    171 
    172 
    173 async def test():
    174 	'''Test the ingestion process'''
    175 
    176 	async for document in process_data():
    177 		print(document)
    178 
    179 
    180 
    181 if __name__ == '__main__':
    182 	import asyncio
    183 
    184 	asyncio.run(test())
    185 
    186 
    187 
    188 '''
    189 Output:
    190 	arin|US|ipv4|76.15.132.0|1024|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28
    191 	arin|US|ipv4|76.15.136.0|2048|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28
    192 	arin|US|ipv4|76.15.144.0|4096|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28
    193 	arin|US|ipv4|76.15.160.0|8192|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28
    194 
    195 Input:
    196 	{
    197 		'registry'   : 'arin',
    198 		'cc'         : 'us',
    199 		'type'       : 'ipv4',
    200 		'ip'         : { 'start': '76.15.132.0', 'end': '76.16.146.0' },
    201 		'date'       : '2007-05-02T00:00:00Z',
    202 		'status'     : 'allocated',
    203 		'extensions' : '6c065d5b54b877781f05e7d30ebfff28'
    204 	}
    205 
    206 Notes:
    207 	Do we make this handle the database locally or load it into ram?
    208 '''