eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/eris.git |
Log | Files | Refs | Archive | README | LICENSE |
eris.py (12338B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # eris.py 4 5 import asyncio 6 import argparse 7 import logging 8 import logging.handlers 9 import os 10 import stat 11 import sys 12 import json 13 14 sys.dont_write_bytecode = True # FUCKOFF __pycache__ 15 16 try: 17 from elasticsearch import AsyncElasticsearch 18 from elasticsearch.exceptions import NotFoundError 19 from elasticsearch.helpers import async_streaming_bulk 20 except ImportError: 21 raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)') 22 23 24 class ElasticIndexer: 25 def __init__(self, args: argparse.Namespace): 26 ''' 27 Initialize the Elastic Search indexer. 28 29 :param args: Parsed arguments from argparse 30 ''' 31 32 self.chunk_max = args.chunk_max * 1024 * 1024 # MB 33 self.chunk_size = args.chunk_size 34 self.es_index = args.index 35 36 # Sniffing disabled due to an issue with the elasticsearch 8.x client (https://github.com/elastic/elasticsearch-py/issues/2005) 37 es_config = { 38 'hosts' : [f'{args.host}:{args.port}'], 39 #'hosts' : [f'{args.host}:{port}' for port in ('9200',)], # Temporary alternative to sniffing 40 'verify_certs' : args.self_signed, 41 'ssl_show_warn' : args.self_signed, 42 'request_timeout' : args.timeout, 43 'max_retries' : args.retries, 44 'retry_on_timeout' : True, 45 'http_compress' : True, 46 'connections_per_node' : 3 # Experiment with this value 47 #'sniff_on_start': True, 48 #'sniff_on_node_failure': True, 49 #'min_delay_between_sniffing': 60 50 } 51 52 if args.api_key: 53 es_config['api_key'] = (args.api_key, '') # Verify this is correct 54 else: 55 es_config['basic_auth'] = (args.user, args.password) 56 57 self.es = AsyncElasticsearch(**es_config) 58 59 60 async def close_connect(self): 61 '''Close the Elasticsearch connection.''' 62 63 await self.es.close() 64 65 66 async def create_index(self, map_body: dict, pipeline: str = None, replicas: int = 1, shards: int = 1): 67 ''' 68 Create the Elasticsearch index with the defined mapping. 69 70 :param map_body: Mapping for the index 71 :param pipeline: Name of the ingest pipeline to use for the index 72 :param replicas: Number of replicas for the index 73 :param shards: Number of shards for the index 74 ''' 75 76 if await self.es.indices.exists(index=self.es_index): 77 logging.info(f'Index \'{self.es_index}\' already exists.') 78 return 79 80 mapping = map_body 81 82 mapping['settings'] = { 83 'number_of_shards' : shards, 84 'number_of_replicas' : replicas 85 } 86 87 if pipeline: 88 try: 89 await self.es.ingest.get_pipeline(id=pipeline) 90 logging.info(f'Using ingest pipeline \'{pipeline}\' for index \'{self.es_index}\'') 91 mapping['settings']['index.default_pipeline'] = pipeline 92 except NotFoundError: 93 raise ValueError(f'Ingest pipeline \'{pipeline}\' does not exist.') 94 95 response = await self.es.indices.create(index=self.es_index, body=mapping) 96 97 if response.get('acknowledged') and response.get('shards_acknowledged'): 98 logging.info(f'Index \'{self.es_index}\' successfully created.') 99 else: 100 raise Exception(f'Failed to create index. ({response})') 101 102 103 async def process_data(self, file_path: str, data_generator: callable): 104 ''' 105 Index records in chunks to Elasticsearch. 106 107 :param file_path: Path to the file 108 :param data_generator: Generator for the records to index 109 ''' 110 111 count = 0 112 total = 0 113 errors = [] 114 115 try: 116 async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max,raise_on_error=False): 117 action, result = result.popitem() 118 119 if not ok: 120 error_type = result.get('error', {}).get('type', 'unknown') 121 error_reason = result.get('error', {}).get('reason', 'unknown') 122 logging.error('FAILED DOCUMENT:') 123 logging.error(f'Error Type : {error_type}') 124 logging.error(f'Error Reason : {error_reason}') 125 logging.error('Document : ') 126 logging.error(json.dumps(result, indent=2)) 127 input('Press Enter to continue...') 128 errors.append(result) 129 continue 130 131 count += 1 132 total += 1 133 134 if count == self.chunk_size: 135 logging.info(f'Successfully indexed {self.chunk_size:,} ({total:,} processed) records to {self.es_index} from {file_path}') 136 count = 0 137 138 if errors: 139 raise Exception(f'{len(errors):,} document(s) failed to index. Check the logs above for details.') 140 141 except Exception as e: 142 raise Exception(f'Failed to index records to {self.es_index} from {file_path} ({e})') 143 144 145 def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_file: str = 'debug.json', max_file_size: int = 10*1024*1024, backups: int = 5, ecs_format: bool = False): 146 ''' 147 Setup the global logger for the application. 148 149 :param console_level: Minimum level to capture logs to the console. 150 :param file_level: Minimum level to capture logs to the file. 151 :param log_file: File to write logs to. 152 :param max_file_size: Maximum size of the log file before it is rotated. 153 :param backups: Number of backup log files to keep. 154 :param ecs_format: Use the Elastic Common Schema (ECS) format for logs. 155 ''' 156 157 # Configure the root logger 158 logger = logging.getLogger() 159 logger.setLevel(logging.DEBUG) # Minimum level to capture all logs 160 161 # Clear existing handlers 162 logger.handlers = [] 163 164 # Setup console handler 165 console_handler = logging.StreamHandler() 166 console_handler.setLevel(console_level) 167 console_formatter = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%I:%M:%S') 168 console_handler.setFormatter(console_formatter) 169 logger.addHandler(console_handler) 170 171 # Setup rotating file handler if file logging is enabled 172 if file_level is not None: 173 file_handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=max_file_size, backupCount=backups) 174 file_handler.setLevel(file_level) 175 176 # Setup formatter to use ECS format if enabled or default format 177 if ecs_format: 178 try: 179 from ecs_logging import StdlibFormatter 180 except ImportError: 181 raise ImportError('Missing required \'ecs-logging\' library. (pip install ecs-logging)') 182 file_formatter = StdlibFormatter() # ECS formatter 183 else: 184 file_formatter = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%Y-%m-%d %H:%M:%S') 185 186 file_handler.setFormatter(file_formatter) 187 logger.addHandler(file_handler) 188 189 190 async def main(): 191 '''Main function when running this script directly.''' 192 193 parser = argparse.ArgumentParser(description='Elasticsearch Recon Ingestion Scripts (ERIS)') 194 195 # General arguments 196 parser.add_argument('input_path', help='Path to the input file or directory') # Required 197 parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing') 198 parser.add_argument('--log', choices=['debug', 'info', 'warning', 'error', 'critical'], help='Logging file level (default: disabled)') 199 parser.add_argument('--ecs', action='store_true', default=False, help='Use the Elastic Common Schema (ECS) for logging') 200 201 # Elasticsearch arguments 202 parser.add_argument('--host', default='http://localhost', help='Elasticsearch host') 203 parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port') 204 parser.add_argument('--user', default='elastic', help='Elasticsearch username') 205 parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)') 206 parser.add_argument('--api-key', default=os.getenv('ES_APIKEY'), help='Elasticsearch API Key for authentication (if not provided, check environment variable ES_APIKEY)') 207 parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates') 208 209 # Elasticsearch indexing arguments 210 parser.add_argument('--index', help='Elasticsearch index name') 211 parser.add_argument('--pipeline', help='Use an ingest pipeline for the index') 212 parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index') 213 parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index') 214 215 # Performance arguments 216 parser.add_argument('--chunk-size', type=int, default=5000, help='Number of records to index in a chunk') 217 parser.add_argument('--chunk-max', type=int, default=10485760, help='Maximum size of a chunk in bytes (default 10mb)') 218 parser.add_argument('--retries', type=int, default=30, help='Number of times to retry indexing a chunk before failing') 219 parser.add_argument('--timeout', type=int, default=60, help='Number of seconds to wait before retrying a chunk') 220 221 # Ingestion arguments 222 parser.add_argument('--certstream', action='store_true', help='Index Certstream records') 223 parser.add_argument('--httpx', action='store_true', help='Index Httpx records') 224 parser.add_argument('--masscan', action='store_true', help='Index Masscan records') 225 parser.add_argument('--massdns', action='store_true', help='Index Massdns records') 226 parser.add_argument('--zone', action='store_true', help='Index Zone records') 227 parser.add_argument('--rir-delegations', action='store_true', help='Index RIR Delegations records') 228 parser.add_argument('--rir-transfers', action='store_true', help='Index RIR Transfers records') 229 230 args = parser.parse_args() 231 232 if args.log: 233 levels = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL} 234 setup_logger(file_level=levels[args.log], log_file='eris.log', ecs_format=args.ecs) 235 else: 236 setup_logger() 237 238 if args.host.endswith('/'): 239 args.host = args.host[:-1] 240 241 if args.watch: 242 if not os.path.exists(args.input_path): 243 os.mkfifo(args.input_path) 244 elif not stat.S_ISFIFO(os.stat(args.input_path).st_mode): 245 raise ValueError(f'Path {args.input_path} is not a FIFO') 246 elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path): 247 raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory') 248 249 logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}') 250 251 edx = ElasticIndexer(args) 252 253 if args.certstream: 254 from ingestors import ingest_certstream as ingestor 255 elif args.httpx: 256 from ingestors import ingest_httpx as ingestor 257 elif args.masscan: 258 from ingestors import ingest_masscan as ingestor 259 elif args.massdns: 260 from ingestors import ingest_massdns as ingestor 261 elif args.rir_delegations: 262 from ingestors import ingest_rir_delegations as ingestor 263 elif args.rir_transfers: 264 from ingestors import ingest_rir_transfers as ingestor 265 elif args.zone: 266 from ingestors import ingest_zone as ingestor 267 else: 268 raise ValueError('No ingestor specified') 269 270 health = await edx.es.cluster.health() 271 logging.info(health) 272 273 #await asyncio.sleep(5) # Delay to allow time for sniffing to complete (Sniffer temporarily disabled) 274 275 if not edx.es_index: 276 edx.es_index = ingestor.default_index 277 278 map_body = ingestor.construct_map() 279 await edx.create_index(map_body, args.pipeline, args.replicas, args.shards) 280 281 if os.path.isfile(args.input_path): 282 logging.info(f'Processing file: {args.input_path}') 283 await edx.process_data(args.input_path, ingestor.process_data) 284 285 elif stat.S_ISFIFO(os.stat(args.input_path).st_mode): 286 logging.info(f'Watching FIFO: {args.input_path}') 287 await edx.process_data(args.input_path, ingestor.process_data) 288 289 elif os.path.isdir(args.input_path): 290 count = 1 291 total = len(os.listdir(args.input_path)) 292 logging.info(f'Processing {total:,} files in directory: {args.input_path}') 293 for file in sorted(os.listdir(args.input_path)): 294 file_path = os.path.join(args.input_path, file) 295 if os.path.isfile(file_path): 296 logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}') 297 await edx.process_data(file_path, ingestor.process_data) 298 count += 1 299 else: 300 logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}') 301 302 await edx.close_connect() # Close the Elasticsearch connection to stop "Unclosed client session" warnings 303 304 305 306 if __name__ == '__main__': 307 print('') 308 print('┏┓┳┓┳┏┓ Elasticsearch Recon Ingestion Scripts') 309 print('┣ ┣┫┃┗┓ Developed by Acidvegas in Python') 310 print('┗┛┛┗┻┗┛ https://git.acid.vegas/eris') 311 print('') 312 asyncio.run(main())