eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

eris.py (12338B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # eris.py
      4 
      5 import asyncio
      6 import argparse
      7 import logging
      8 import logging.handlers
      9 import os
     10 import stat
     11 import sys
     12 import json
     13 
     14 sys.dont_write_bytecode = True # FUCKOFF __pycache__
     15 
     16 try:
     17 	from elasticsearch            import AsyncElasticsearch
     18 	from elasticsearch.exceptions import NotFoundError
     19 	from elasticsearch.helpers    import async_streaming_bulk
     20 except ImportError:
     21 	raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
     22 
     23 
     24 class ElasticIndexer:
     25 	def __init__(self, args: argparse.Namespace):
     26 		'''
     27 		Initialize the Elastic Search indexer.
     28 
     29 		:param args: Parsed arguments from argparse
     30 		'''
     31 
     32 		self.chunk_max  = args.chunk_max * 1024 * 1024 # MB
     33 		self.chunk_size = args.chunk_size
     34 		self.es_index   = args.index
     35 
     36 		# Sniffing disabled due to an issue with the elasticsearch 8.x client (https://github.com/elastic/elasticsearch-py/issues/2005)
     37 		es_config = {
     38 			'hosts'               : [f'{args.host}:{args.port}'],
     39 			#'hosts'                : [f'{args.host}:{port}' for port in ('9200',)], # Temporary alternative to sniffing
     40 			'verify_certs'         : args.self_signed,
     41 			'ssl_show_warn'        : args.self_signed,
     42 			'request_timeout'      : args.timeout,
     43 			'max_retries'          : args.retries,
     44 			'retry_on_timeout'     : True,
     45 			'http_compress'        : True,
     46 			'connections_per_node' : 3 # Experiment with this value
     47 			#'sniff_on_start': True,
     48 			#'sniff_on_node_failure': True,
     49 			#'min_delay_between_sniffing': 60
     50 		}
     51 
     52 		if args.api_key:
     53 			es_config['api_key'] = (args.api_key, '') # Verify this is correct
     54 		else:
     55 			es_config['basic_auth'] = (args.user, args.password)
     56 
     57 		self.es = AsyncElasticsearch(**es_config)
     58 
     59 
     60 	async def close_connect(self):
     61 		'''Close the Elasticsearch connection.'''
     62 
     63 		await self.es.close()
     64 
     65 
     66 	async def create_index(self, map_body: dict, pipeline: str = None, replicas: int = 1, shards: int = 1):
     67 		'''
     68 		Create the Elasticsearch index with the defined mapping.
     69 
     70 		:param map_body: Mapping for the index
     71 		:param pipeline: Name of the ingest pipeline to use for the index
     72 		:param replicas: Number of replicas for the index
     73 		:param shards: Number of shards for the index
     74 		'''
     75 
     76 		if await self.es.indices.exists(index=self.es_index):
     77 			logging.info(f'Index \'{self.es_index}\' already exists.')
     78 			return
     79 
     80 		mapping = map_body
     81 
     82 		mapping['settings'] = {
     83 			'number_of_shards'   : shards,
     84 			'number_of_replicas' : replicas
     85 		}
     86 
     87 		if pipeline:
     88 			try:
     89 				await self.es.ingest.get_pipeline(id=pipeline)
     90 				logging.info(f'Using ingest pipeline \'{pipeline}\' for index \'{self.es_index}\'')
     91 				mapping['settings']['index.default_pipeline'] = pipeline
     92 			except NotFoundError:
     93 				raise ValueError(f'Ingest pipeline \'{pipeline}\' does not exist.')
     94 
     95 		response = await self.es.indices.create(index=self.es_index, body=mapping)
     96 
     97 		if response.get('acknowledged') and response.get('shards_acknowledged'):
     98 			logging.info(f'Index \'{self.es_index}\' successfully created.')
     99 		else:
    100 			raise Exception(f'Failed to create index. ({response})')
    101 
    102 
    103 	async def process_data(self, file_path: str, data_generator: callable):
    104 		'''
    105 		Index records in chunks to Elasticsearch.
    106 
    107 		:param file_path: Path to the file
    108 		:param data_generator: Generator for the records to index
    109 		'''
    110 
    111 		count  = 0
    112 		total  = 0
    113 		errors = []
    114 
    115 		try:
    116 			async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max,raise_on_error=False):
    117 				action, result = result.popitem()
    118 
    119 				if not ok:
    120 					error_type   = result.get('error', {}).get('type',   'unknown')
    121 					error_reason = result.get('error', {}).get('reason', 'unknown')
    122 					logging.error('FAILED DOCUMENT:')
    123 					logging.error(f'Error Type   : {error_type}')
    124 					logging.error(f'Error Reason : {error_reason}')
    125 					logging.error('Document     : ')
    126 					logging.error(json.dumps(result, indent=2))
    127 					input('Press Enter to continue...')
    128 					errors.append(result)
    129 					continue
    130 
    131 				count += 1
    132 				total += 1
    133 
    134 				if count == self.chunk_size:
    135 					logging.info(f'Successfully indexed {self.chunk_size:,} ({total:,} processed) records to {self.es_index} from {file_path}')
    136 					count = 0
    137 
    138 			if errors:
    139 				raise Exception(f'{len(errors):,} document(s) failed to index. Check the logs above for details.')
    140 
    141 		except Exception as e:
    142 			raise Exception(f'Failed to index records to {self.es_index} from {file_path} ({e})')
    143 
    144 
    145 def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_file: str = 'debug.json', max_file_size: int = 10*1024*1024, backups: int = 5, ecs_format: bool = False):
    146 	'''
    147 	Setup the global logger for the application.
    148 
    149 	:param console_level: Minimum level to capture logs to the console.
    150 	:param file_level: Minimum level to capture logs to the file.
    151 	:param log_file: File to write logs to.
    152 	:param max_file_size: Maximum size of the log file before it is rotated.
    153 	:param backups: Number of backup log files to keep.
    154 	:param ecs_format: Use the Elastic Common Schema (ECS) format for logs.
    155 	'''
    156 
    157 	# Configure the root logger
    158 	logger = logging.getLogger()
    159 	logger.setLevel(logging.DEBUG) # Minimum level to capture all logs
    160 
    161 	# Clear existing handlers
    162 	logger.handlers = []
    163 
    164 	# Setup console handler
    165 	console_handler = logging.StreamHandler()
    166 	console_handler.setLevel(console_level)
    167 	console_formatter = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%I:%M:%S')
    168 	console_handler.setFormatter(console_formatter)
    169 	logger.addHandler(console_handler)
    170 
    171 	# Setup rotating file handler if file logging is enabled
    172 	if file_level is not None:
    173 		file_handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=max_file_size, backupCount=backups)
    174 		file_handler.setLevel(file_level)
    175   
    176 		# Setup formatter to use ECS format if enabled or default format
    177 		if ecs_format:
    178 			try:
    179 				from ecs_logging import StdlibFormatter
    180 			except ImportError:
    181 				raise ImportError('Missing required \'ecs-logging\' library. (pip install ecs-logging)')
    182 			file_formatter = StdlibFormatter() # ECS formatter
    183 		else:
    184 			file_formatter = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%Y-%m-%d %H:%M:%S')
    185         
    186 		file_handler.setFormatter(file_formatter)
    187 		logger.addHandler(file_handler)
    188 
    189 
    190 async def main():
    191 	'''Main function when running this script directly.'''
    192 
    193 	parser = argparse.ArgumentParser(description='Elasticsearch Recon Ingestion Scripts (ERIS)')
    194 
    195 	# General arguments
    196 	parser.add_argument('input_path', help='Path to the input file or directory') # Required
    197 	parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
    198 	parser.add_argument('--log', choices=['debug', 'info', 'warning', 'error', 'critical'], help='Logging file level (default: disabled)')
    199 	parser.add_argument('--ecs', action='store_true', default=False, help='Use the Elastic Common Schema (ECS) for logging')
    200 
    201 	# Elasticsearch arguments
    202 	parser.add_argument('--host', default='http://localhost', help='Elasticsearch host')
    203 	parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
    204 	parser.add_argument('--user', default='elastic', help='Elasticsearch username')
    205 	parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
    206 	parser.add_argument('--api-key', default=os.getenv('ES_APIKEY'), help='Elasticsearch API Key for authentication (if not provided, check environment variable ES_APIKEY)')
    207 	parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
    208 
    209 	# Elasticsearch indexing arguments
    210 	parser.add_argument('--index', help='Elasticsearch index name')
    211 	parser.add_argument('--pipeline', help='Use an ingest pipeline for the index')
    212 	parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
    213 	parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
    214 
    215 	# Performance arguments
    216 	parser.add_argument('--chunk-size', type=int, default=5000, help='Number of records to index in a chunk')
    217 	parser.add_argument('--chunk-max', type=int, default=10485760, help='Maximum size of a chunk in bytes (default 10mb)')
    218 	parser.add_argument('--retries', type=int, default=30, help='Number of times to retry indexing a chunk before failing')
    219 	parser.add_argument('--timeout', type=int, default=60, help='Number of seconds to wait before retrying a chunk')
    220 
    221 	# Ingestion arguments
    222 	parser.add_argument('--certstream', action='store_true', help='Index Certstream records')
    223 	parser.add_argument('--httpx', action='store_true', help='Index Httpx records')
    224 	parser.add_argument('--masscan', action='store_true', help='Index Masscan records')
    225 	parser.add_argument('--massdns', action='store_true', help='Index Massdns records')
    226 	parser.add_argument('--zone', action='store_true', help='Index Zone records')
    227 	parser.add_argument('--rir-delegations', action='store_true', help='Index RIR Delegations records')
    228 	parser.add_argument('--rir-transfers', action='store_true', help='Index RIR Transfers records')
    229 
    230 	args = parser.parse_args()
    231 
    232 	if args.log:
    233 		levels = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL}
    234 		setup_logger(file_level=levels[args.log], log_file='eris.log', ecs_format=args.ecs)
    235 	else:
    236 		setup_logger()
    237 
    238 	if args.host.endswith('/'):
    239 		args.host = args.host[:-1]
    240 
    241 	if args.watch:
    242 		if not os.path.exists(args.input_path):
    243 			os.mkfifo(args.input_path)
    244 		elif not stat.S_ISFIFO(os.stat(args.input_path).st_mode):
    245 			raise ValueError(f'Path {args.input_path} is not a FIFO')
    246 	elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
    247 		raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
    248 
    249 	logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}')
    250 
    251 	edx = ElasticIndexer(args)
    252 
    253 	if args.certstream:
    254 		from ingestors import ingest_certstream      as ingestor
    255 	elif args.httpx:
    256 		from ingestors import ingest_httpx           as ingestor
    257 	elif args.masscan:
    258 		from ingestors import ingest_masscan         as ingestor
    259 	elif args.massdns:
    260 		from ingestors import ingest_massdns         as ingestor
    261 	elif args.rir_delegations:
    262 		from ingestors import ingest_rir_delegations as ingestor
    263 	elif args.rir_transfers:
    264 		from ingestors import ingest_rir_transfers   as ingestor
    265 	elif args.zone:
    266 		from ingestors import ingest_zone            as ingestor
    267 	else:
    268 		raise ValueError('No ingestor specified')
    269 
    270 	health = await edx.es.cluster.health()
    271 	logging.info(health)
    272 
    273 	#await asyncio.sleep(5) # Delay to allow time for sniffing to complete (Sniffer temporarily disabled)
    274 
    275 	if not edx.es_index:
    276 		edx.es_index = ingestor.default_index
    277 
    278 	map_body = ingestor.construct_map()
    279 	await edx.create_index(map_body, args.pipeline, args.replicas, args.shards)
    280 
    281 	if os.path.isfile(args.input_path):
    282 		logging.info(f'Processing file: {args.input_path}')
    283 		await edx.process_data(args.input_path, ingestor.process_data)
    284 
    285 	elif stat.S_ISFIFO(os.stat(args.input_path).st_mode):
    286 		logging.info(f'Watching FIFO: {args.input_path}')
    287 		await edx.process_data(args.input_path, ingestor.process_data)
    288 
    289 	elif os.path.isdir(args.input_path):
    290 		count = 1
    291 		total = len(os.listdir(args.input_path))
    292 		logging.info(f'Processing {total:,} files in directory: {args.input_path}')
    293 		for file in sorted(os.listdir(args.input_path)):
    294 			file_path = os.path.join(args.input_path, file)
    295 			if os.path.isfile(file_path):
    296 				logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
    297 				await edx.process_data(file_path, ingestor.process_data)
    298 				count += 1
    299 			else:
    300 				logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
    301 
    302 	await edx.close_connect() # Close the Elasticsearch connection to stop "Unclosed client session" warnings
    303 
    304 
    305 
    306 if __name__ == '__main__':
    307 	print('')
    308 	print('┏┓┳┓┳┏┓   Elasticsearch Recon Ingestion Scripts')
    309 	print('┣ ┣┫┃┗┓        Developed by Acidvegas in Python')
    310 	print('┗┛┛┗┻┗┛             https://git.acid.vegas/eris')
    311 	print('')
    312 	asyncio.run(main())