eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

commit c105db705dc01459488f1d9d90a55924cbf789a1
parent 88e0dbfea8c471c3d578dd628eaa7cc74a7b7853
Author: acidvegas <acid.vegas@acid.vegas>
Date: Sat, 27 Jan 2024 04:28:30 -0500

Updated README, copied over consistencies across the ingestors, docstring updates to reflect on new arguments

Diffstat:
MREADME.md | 48++++++++++++++++++++++++++++--------------------
Mingestors/ingest_httpx.py | 229++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------
Mingestors/ingest_masscan.py | 48++++++++++++++++++++++++++++++++----------------
Mingestors/ingest_massdns.py | 53++++++++++++++++++++++++++++++++++-------------------
Mingestors/ingest_zone.py | 208+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------
Mingestors/sniff_patch.py | 56+++++++++++++++++++++++++++++++++++---------------------

6 files changed, 455 insertions(+), 187 deletions(-)

diff --git a/README.md b/README.md
@@ -13,6 +13,10 @@ python ingest_XXXX.py [options] <input>
 ```
 **Note:** The `<input>` can be a file or a directory of files, depending on the ingestion script.
 
+## Operations
+This ingestion suite will use the built in node sniffer, so by connecting to a single node, you can load balance across the entire cluster.
+It is good to know how much nodes you have in the cluster to determine how to fine tune the arguments for the best performance, based on your environment.
+
 ###
 ###### General arguments
 | Argument          | Description                                                    |
@@ -22,28 +26,32 @@ python ingest_XXXX.py [options] <input>
 | `--watch`         | Watch the input file for new lines and index them in real time |
 
 ###### Elasticsearch arguments
-| Argument          | Description                                                                          | Default       |
-|-------------------|--------------------------------------------------------------------------------------|---------------|
-| `--host`          | Elasticsearch host *(Will sniff for other nodes in the cluster)*                     | `localhost`   |
-| `--port`          | Elasticsearch port                                                                   | `9200`        |
-| `--user`          | Elasticsearch username                                                               | `elastic`     |
-| `--password`      | Elasticsearch password *(if not provided, check environment variable `ES_PASSWORD`)* |               |
-| `--api-key`       | Elasticsearch API Key for authentication                                             |               |
-| `--self-signed`   | Elastic search instance is using a self-signed certificate                           | `true`        |
-| `--index`         | Elasticsearch index name                                                             | `masscan-logs`|
-| `--shards`        | Number of shards for the index                                                       | `1`           |
-| `--replicas`      | Number of replicas for the index                                                     | `1`           |
+| Argument          | Description                                            | Default        |
+|-------------------|--------------------------------------------------------|----------------|
+| `--host`          | Elasticsearch host                                     | `localhost`    |
+| `--port`          | Elasticsearch port                                     | `9200`         |
+| `--user`          | Elasticsearch username                                 | `elastic`      |
+| `--password`      | Elasticsearch password                                 | `$ES_PASSWORD` |
+| `--api-key`       | Elasticsearch API Key for authentication               |                |
+| `--self-signed`   | Elasticsearch connection with aself-signed certificate |                |
+
+###### Elasticsearch indexing arguments
+| Argument          | Description                      | Default             |
+|-------------------|----------------------------------|---------------------|
+| `--index`         | Elasticsearch index name         | Depends on ingestor |
+| `--shards`        | Number of shards for the index   | `1`                 |
+| `--replicas`      | Number of replicas for the index | `1`                 |
 
 ###### Performance arguments
-| Argument          | Description                                                                          | Default       |
-|-------------------|--------------------------------------------------------------------------------------|---------------|
-| `--batch-max`     | Maximum size in MB of a batch                                                        | `10`          |
-| `--batch-size`    | Number of records to index in a batch                                                | `5000`        |
-| `--batch-threads` | Number of threads to use when indexing in batches                                    | `2`           |
-| `--retries`       | Number of times to retry indexing a batch before failing                             | `10`          |
-| `--timeout`       | Number of seconds to wait before retrying a batch                                    | `30`          |
-
-**NOTE:** Using `--batch-threads` as 4 and `--batch-size` as 10000 with 3 nodes would process 120,000 records before indexing 40,000 per node.
+| Argument          | Description                                              | Default |
+|-------------------|----------------------------------------------------------|---------|
+| `--batch-max`     | Maximum size in MB of a batch                            | `10`    |
+| `--batch-size`    | Number of records to index in a batch                    | `5000`  |
+| `--batch-threads` | Number of threads to use when indexing in batches        | `2`     |
+| `--retries`       | Number of times to retry indexing a batch before failing | `10`    |
+| `--timeout`       | Number of seconds to wait before retrying a batch        | `30`    |
+
+Using `--batch-threads` as 4 and `--batch-size` as 10000 with 3 nodes would process 120,000 records before indexing 40,000 per node. Take these kind of metrics int account when consider how much records you want to process at once and the memory limitations of your environment, aswell as the networking constraint it may have ono your node(s), depending on the size of your cluster.
 
 ___
 
diff --git a/ingestors/ingest_httpx.py b/ingestors/ingest_httpx.py
@@ -15,7 +15,7 @@ import os
 import time
 
 try:
-    from elasticsearch import Elasticsearch, helpers, ConnectionError, TransportError
+    from elasticsearch import Elasticsearch, helpers
 except ImportError:
     raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
 
@@ -25,11 +25,11 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(level
 
 
 class ElasticIndexer:
-    def __init__(self, es_host: str, es_port: str, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False):
+    def __init__(self, es_host: str, es_port: int, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False, retries: int = 10, timeout: int = 30):
         '''
         Initialize the Elastic Search indexer.
 
-        :param es_host: Elasticsearch host
+        :param es_host: Elasticsearch host(s)
         :param es_port: Elasticsearch port
         :param es_user: Elasticsearch username
         :param es_password: Elasticsearch password
@@ -37,6 +37,8 @@ class ElasticIndexer:
         :param es_index: Elasticsearch index name
         :param dry_run: If True, do not initialize Elasticsearch client
         :param self_signed: If True, do not verify SSL certificates
+        :param retries: Number of times to retry indexing a batch before failing
+        :param timeout: Number of seconds to wait before retrying a batch
         '''
 
         self.dry_run = dry_run
@@ -44,19 +46,53 @@ class ElasticIndexer:
         self.es_index = es_index
 
         if not dry_run:
+            es_config = {
+                'hosts': [f'{es_host}:{es_port}'],
+                'verify_certs': self_signed,
+                'ssl_show_warn': self_signed,
+                'request_timeout': timeout,
+                'max_retries': retries,
+                'retry_on_timeout': True,
+                'sniff_on_start': False,
+                'sniff_on_node_failure': True,
+                'min_delay_between_sniffing': 60
+            }
 
             if es_api_key:
-                self.es = Elasticsearch([f'{es_host}:{es_port}'], headers={'Authorization': f'ApiKey {es_api_key}'}, verify_certs=self_signed, ssl_show_warn=self_signed)
+                es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'}
             else:
-                self.es = Elasticsearch([f'{es_host}:{es_port}'], basic_auth=(es_user, es_password), verify_certs=self_signed, ssl_show_warn=self_signed)
+                es_config['basic_auth'] = (es_user, es_password)
+                
+            # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
+            import sniff_patch
+            self.es = sniff_patch.init_elasticsearch(**es_config)
+
+            # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
+            #self.es = Elasticsearch(**es_config)
+
+
+    def get_cluster_health(self) -> dict:
+        '''Get the health of the Elasticsearch cluster.'''
+
+        return self.es.cluster.health()
+    
+
+    def get_cluster_size(self) -> int:
+        '''Get the number of nodes in the Elasticsearch cluster.'''
+
+        cluster_stats = self.es.cluster.stats()
+        number_of_nodes = cluster_stats['nodes']['count']['total']
 
+        return number_of_nodes
 
-    def process_file(self, file_path: str, batch_size: int):
+
+    def process_file(self, file_path: str, watch: bool = False, chunk: dict = {}):
         '''
         Read and index HTTPX records in batches to Elasticsearch, handling large volumes efficiently.
 
         :param file_path: Path to the HTTPX log file
-        :param batch_size: Number of records to process before indexing
+        :param watch: If True, watch the file for new lines and index them in real time
+        :param chunk: Chunking configuration
 
         Example record:
         {
@@ -110,7 +146,7 @@ class ElasticIndexer:
         records = []
 
         with open(file_path, 'r') as file:
-            for line in file:
+            for line in (file := follow(file) if watch else file):
                 line = line.strip()
 
                 if not line:
@@ -129,97 +165,172 @@ class ElasticIndexer:
                     record = {'_index': self.es_index, '_source': record}
                     records.append(record)
                     count += 1
-                    if len(records) >= batch_size:
-                         while True:
-                            try:
-                                success, _ = helpers.bulk(self.es, records)
-                            except (ConnectionError, TransportError) as e:
-                                logging.error(f'Failed to index records to Elasticsearch. ({e})')
-                                time.sleep(60)
-                            else:
-                                logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}')
-                                records = []
-                                break
+                    if len(records) >= chunk['batch']:
+                            self.bulk_index(records, file_path, chunk, count)
+                            records = []
 
         if records:
-             while True:
-                try:
-                    success, _ = helpers.bulk(self.es, records)
-                except (ConnectionError, TransportError) as e:
-                    logging.error(f'Failed to index records to Elasticsearch. ({e})')
-                    time.sleep(60)
-                else:
-                    logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}')
+            self.bulk_index(records, file_path, chunk, count)
+
+    def bulk_index(self, documents: list, file_path: str, chunk: dict, count: int):
+        '''
+        Index a batch of documents to Elasticsearch.
+        
+        :param documents: List of documents to index
+        :param file_path: Path to the file being indexed
+        :param count: Total number of records processed
+        '''
+        
+        remaining_documents = documents
+
+        parallel_bulk_config = {
+            'client': self.es,
+            'chunk_size': chunk['size'],
+            'max_chunk_bytes': chunk['max_size'] * 1024 * 1024, # MB
+            'thread_count': chunk['threads'],
+            'queue_size': 2
+        }
+
+        while remaining_documents:
+            failed_documents = []
+
+            try:
+                for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config):
+                    if not success:
+                        failed_documents.append(response)
+
+                if not failed_documents:
+                    ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count']
+                    logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}')
                     break
 
+                else:
+                    logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...')
+                    remaining_documents = failed_documents
+            except Exception as e:
+                logging.error(f'Failed to index documents! ({e})')
+                time.sleep(30)
+
+
+    def follow(file) -> str:
+        '''
+        Generator function that yields new lines in a file in real time.
+        
+        :param file: File object to read from
+        '''
+
+        file.seek(0,2) # Go to the end of the file
+
+        while True:
+            line = file.readline()
+
+            if not line:
+                time.sleep(0.1)
+                continue
+
+            yield line
+
 
 def main():
     '''Main function when running this script directly.'''
 
     parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
-    parser.add_argument('input_path', help='Path to the input file or directory')
-
+    
     # General arguments
+    parser.add_argument('input_path', help='Path to the input file or directory') # Required
     parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
-    parser.add_argument('--batch_size', type=int, default=50000, help='Number of records to index in a batch')
-
-    # Elasticsearch connection arguments
+    parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time')
+    
+    # Elasticsearch arguments
     parser.add_argument('--host', default='localhost', help='Elasticsearch host')
     parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
     parser.add_argument('--user', default='elastic', help='Elasticsearch username')
     parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
-    parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')    
+    parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')
     parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
 
     # Elasticsearch indexing arguments
-    parser.add_argument('--index', default='zone-files', help='Elasticsearch index name')
+    parser.add_argument('--index', default='httpx-logs', help='Elasticsearch index name')
     parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
     parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
 
+    # Performance arguments
+    parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch')
+    parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch')
+    parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches')
+    parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing')
+    parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch')
+
     args = parser.parse_args()
 
-    if not os.path.exists(args.input_path):
-        raise FileNotFoundError(f'Input file {args.input_path} does not exist')
+    # Argument validation
+    if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
+        raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
 
     if not args.dry_run:
         if args.batch_size < 1:
             raise ValueError('Batch size must be greater than 0')
+        
+        elif args.retries < 1:
+            raise ValueError('Number of retries must be greater than 0')
+        
+        elif args.timeout < 5:
+            raise ValueError('Timeout must be greater than 4')
+        
+        elif args.batch_max < 1:
+            raise ValueError('Batch max size must be greater than 0')
+        
+        elif args.batch_threads < 1:
+            raise ValueError('Batch threads must be greater than 0')
 
-        if not args.host:
+        elif not args.host:
             raise ValueError('Missing required Elasticsearch argument: host')
-        
-        if not args.api_key and (not args.user or not args.password):
+
+        elif not args.api_key and (not args.user or not args.password):
             raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
-        
-        if args.shards < 1:
+
+        elif args.shards < 1:
             raise ValueError('Number of shards must be greater than 0')
-        
-        if args.replicas < 1:
+
+        elif args.replicas < 0:
             raise ValueError('Number of replicas must be greater than 0')
+
+    edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout)
+    
+    if not args.dry_run:
+        print(edx.get_cluster_health())
         
-        logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...')
+        time.sleep(3) # Delay to allow time for sniffing to complete
 
-    edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed)
+        nodes = edx.get_cluster_size()
+
+        logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
 
-    if not args.dry_run:
         edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
 
+        chunk = {
+            'size': args.batch_size,
+            'max_size': args.batch_max * 1024 * 1024, # MB
+            'threads': args.batch_threads
+        }
+        
+        chunk['batch'] = nodes * (chunk['size'] * chunk['threads'])
+    else:
+        chunk = {} # Ugly hack to get this working...
+
     if os.path.isfile(args.input_path):
         logging.info(f'Processing file: {args.input_path}')
-        edx.process_file(args.input_path, args.batch_size)
+        edx.process_file(args.input_path, args.watch, chunk)
 
     elif os.path.isdir(args.input_path):
-        logging.info(f'Processing files in directory: {args.input_path}')
+        count = 1
+        total = len(os.listdir(args.input_path))
+        logging.info(f'Processing {total:,} files in directory: {args.input_path}')
         for file in sorted(os.listdir(args.input_path)):
             file_path = os.path.join(args.input_path, file)
             if os.path.isfile(file_path):
-                logging.info(f'Processing file: {file_path}')
-                edx.process_file(file_path, args.batch_size)
-
-    else:
-        raise ValueError(f'Input path {args.input_path} is not a file or directory')
-
-
-
-if __name__ == '__main__':
-    main()
-\ No newline at end of file
+                logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
+                edx.process_file(file_path, args.watch, chunk)
+                count += 1
+            else:
+                logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
+\ No newline at end of file
diff --git a/ingestors/ingest_masscan.py b/ingestors/ingest_masscan.py
@@ -60,7 +60,7 @@ class ElasticIndexer:
                 'request_timeout': timeout,
                 'max_retries': retries,
                 'retry_on_timeout': True,
-                'sniff_on_start': True,
+                'sniff_on_start': False,
                 'sniff_on_node_failure': True,
                 'min_delay_between_sniffing': 60
             }
@@ -69,12 +69,22 @@ class ElasticIndexer:
                 es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'}
             else:
                 es_config['basic_auth'] = (es_user, es_password)
+                
+            # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
+            import sniff_patch
+            self.es = sniff_patch.init_elasticsearch(**es_config)
 
-            self.es = Elasticsearch(**es_config)
+            # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
+            #self.es = Elasticsearch(**es_config)
 
 
     def create_index(self, shards: int = 1, replicas: int = 1):
-        '''Create the Elasticsearch index with the defined mapping.'''
+        '''
+        Create the Elasticsearch index with the defined mapping.
+        
+        :param shards: Number of shards for the index
+        :param replicas: Number of replicas for the index
+        '''
 
         mapping = {
             'settings': {
@@ -106,6 +116,12 @@ class ElasticIndexer:
             logging.warning(f'Index \'{self.es_index}\' already exists.')
 
 
+    def get_cluster_health(self) -> dict:
+        '''Get the health of the Elasticsearch cluster.'''
+
+        return self.es.cluster.health()
+    
+
     def get_cluster_size(self) -> int:
         '''Get the number of nodes in the Elasticsearch cluster.'''
 
@@ -120,8 +136,8 @@ class ElasticIndexer:
         Read and index Masscan records in batches to Elasticsearch, handling large volumes efficiently.
 
         :param file_path: Path to the Masscan log file
-        :param batch_size: Number of records to process before indexing
-        :param watch: If True, input file will be watched for new lines and indexed in real time
+        :param watch: If True, input file will be watched for new lines and indexed in real time\
+        :param chunk: Chunk configuration for indexing in batches
         
         Example record:
         {
@@ -207,6 +223,7 @@ class ElasticIndexer:
         :param file_path: Path to the file being indexed
         :param count: Total number of records processed
         '''
+
         remaining_documents = documents
 
         parallel_bulk_config = {
@@ -261,9 +278,9 @@ def main():
     '''Main function when running this script directly.'''
 
     parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
-    parser.add_argument('input_path', help='Path to the input file or directory')
-
+    
     # General arguments
+    parser.add_argument('input_path', help='Path to the input file or directory') # Required
     parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
     parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time')
     
@@ -280,22 +297,18 @@ def main():
     parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
     parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
 
-    # Batch arguments (for fine tuning performance)
+    # Performance arguments
     parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch')
     parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch')
     parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches')
-    # NOTE: Using --batch-threads as 4 and --batch-size as 10000 means we will process 40,000 records per-node before indexing, so 3 nodes would process 120,000 records before indexing
-
-    # Elasticsearch retry arguments
     parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing')
     parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch')
 
     args = parser.parse_args()
 
-    if not os.path.exists(args.input_path):
-        raise FileNotFoundError(f'Input file {args.input_path} does not exist')
-    elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
-        raise ValueError(f'Input path {args.input_path} is not a file or directory')
+    # Argument validation
+    if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
+        raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
 
     if not args.dry_run:
         if args.batch_size < 1:
@@ -328,16 +341,19 @@ def main():
     edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout)
     
     if not args.dry_run:
+        print(edx.get_cluster_health())
+        
         time.sleep(3) # Delay to allow time for sniffing to complete
 
         nodes = edx.get_cluster_size()
+
         logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
 
         edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
 
         chunk = {
             'size': args.batch_size,
-            'max_size': args.batch_max * 1024 * 1024,
+            'max_size': args.batch_max * 1024 * 1024, # MB
             'threads': args.batch_threads
         }
         
diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py
@@ -15,7 +15,6 @@ import time
 
 try:
     from elasticsearch import Elasticsearch, helpers
-    import sniff_patch
 except ImportError:
     raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
 
@@ -62,13 +61,22 @@ class ElasticIndexer:
                 es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'}
             else:
                 es_config['basic_auth'] = (es_user, es_password)
+                
+            # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
+            import sniff_patch
+            self.es = sniff_patch.init_elasticsearch(**es_config)
 
+            # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
             #self.es = Elasticsearch(**es_config)
-            self.es = sniff_patch.init_elasticsearch(**es_config)
 
 
     def create_index(self, shards: int = 1, replicas: int = 1):
-        '''Create the Elasticsearch index with the defined mapping.'''
+        '''
+        Create the Elasticsearch index with the defined mapping.
+        
+        :param shards: Number of shards for the index
+        :param replicas: Number of replicas for the index
+        '''
 
         mapping = {
             'settings': {
@@ -97,6 +105,12 @@ class ElasticIndexer:
             logging.warning(f'Index \'{self.es_index}\' already exists.')
 
 
+    def get_cluster_health(self) -> dict:
+        '''Get the health of the Elasticsearch cluster.'''
+
+        return self.es.cluster.health()
+    
+
     def get_cluster_size(self) -> int:
         '''Get the number of nodes in the Elasticsearch cluster.'''
 
@@ -106,13 +120,13 @@ class ElasticIndexer:
         return number_of_nodes
 
 
-
     def process_file(self, file_path: str, watch: bool = False, chunk: dict = {}):
         '''
         Read and index PTR records in batches to Elasticsearch, handling large volumes efficiently.
 
-        :param file_path: Path to the PTR log file
-        :param batch_size: Number of records to process before indexing
+        :param file_path: Path to the Masscan log file
+        :param watch: If True, input file will be watched for new lines and indexed in real time\
+        :param chunk: Chunk configuration for indexing in batches
 
         Example PTR record:
         0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
@@ -181,14 +195,16 @@ class ElasticIndexer:
         
         :param documents: List of documents to index
         :param file_path: Path to the file being indexed
+        :param chunk: Chunk configuration
         :param count: Total number of records processed
         '''
+        
         remaining_documents = documents
 
         parallel_bulk_config = {
             'client': self.es,
             'chunk_size': chunk['size'],
-            'max_chunk_bytes': chunk['max_size'] * 1024 * 1024, # MB
+            'max_chunk_bytes': chunk['max_size'],
             'thread_count': chunk['threads'],
             'queue_size': 2
         }
@@ -237,9 +253,9 @@ def main():
     '''Main function when running this script directly.'''
 
     parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
-    parser.add_argument('input_path', help='Path to the input file or directory')
-
+    
     # General arguments
+    parser.add_argument('input_path', help='Path to the input file or directory') # Required
     parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
     parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time')
     
@@ -256,22 +272,18 @@ def main():
     parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
     parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
 
-    # Batch arguments (for fine tuning performance)
+    # Performance arguments
     parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch')
     parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch')
     parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches')
-    # NOTE: Using --batch-threads as 4 and --batch-size as 10000 means we will process 40,000 records per-node before indexing, so 3 nodes would process 120,000 records before indexing
-
-    # Elasticsearch retry arguments
     parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing')
     parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch')
 
     args = parser.parse_args()
 
-    if not os.path.exists(args.input_path):
-        raise FileNotFoundError(f'Input file {args.input_path} does not exist')
-    elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
-        raise ValueError(f'Input path {args.input_path} is not a file or directory')
+    # Argument validation
+    if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
+        raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
 
     if not args.dry_run:
         if args.batch_size < 1:
@@ -304,16 +316,19 @@ def main():
     edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout)
     
     if not args.dry_run:
-        time.sleep(5) # Delay to allow time for sniffing to complete
+        print(edx.get_cluster_health())
+
+        time.sleep(3) # Delay to allow time for sniffing to complete
 
         nodes = edx.get_cluster_size()
+
         logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
 
         edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
 
         chunk = {
             'size': args.batch_size,
-            'max_size': args.batch_max * 1024 * 1024,
+            'max_size': args.batch_max * 1024 * 1024, # MB
             'threads': args.batch_threads
         }
         
diff --git a/ingestors/ingest_zone.py b/ingestors/ingest_zone.py
@@ -39,11 +39,11 @@ record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','nap
 
 
 class ElasticIndexer:
-    def __init__(self, es_host: str, es_port: str, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False):
+    def __init__(self, es_host: str, es_port: int, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False, retries: int = 10, timeout: int = 30):
         '''
         Initialize the Elastic Search indexer.
 
-        :param es_host: Elasticsearch host
+        :param es_host: Elasticsearch host(s)
         :param es_port: Elasticsearch port
         :param es_user: Elasticsearch username
         :param es_password: Elasticsearch password
@@ -51,6 +51,8 @@ class ElasticIndexer:
         :param es_index: Elasticsearch index name
         :param dry_run: If True, do not initialize Elasticsearch client
         :param self_signed: If True, do not verify SSL certificates
+        :param retries: Number of times to retry indexing a batch before failing
+        :param timeout: Number of seconds to wait before retrying a batch
         '''
 
         self.dry_run = dry_run
@@ -58,15 +60,38 @@ class ElasticIndexer:
         self.es_index = es_index
 
         if not dry_run:
+            es_config = {
+                'hosts': [f'{es_host}:{es_port}'],
+                'verify_certs': self_signed,
+                'ssl_show_warn': self_signed,
+                'request_timeout': timeout,
+                'max_retries': retries,
+                'retry_on_timeout': True,
+                'sniff_on_start': False,
+                'sniff_on_node_failure': True,
+                'min_delay_between_sniffing': 60
+            }
 
             if es_api_key:
-                self.es = Elasticsearch([f'{es_host}:{es_port}'], headers={'Authorization': f'ApiKey {es_api_key}'}, verify_certs=self_signed, ssl_show_warn=self_signed)
+                es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'}
             else:
-                self.es = Elasticsearch([f'{es_host}:{es_port}'], basic_auth=(es_user, es_password), verify_certs=self_signed, ssl_show_warn=self_signed)
+                es_config['basic_auth'] = (es_user, es_password)
+                
+            # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
+            import sniff_patch
+            self.es = sniff_patch.init_elasticsearch(**es_config)
+
+            # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
+            #self.es = Elasticsearch(**es_config)
 
 
     def create_index(self, shards: int = 1, replicas: int = 1):
-        '''Create the Elasticsearch index with the defined mapping.'''
+        '''
+        Create the Elasticsearch index with the defined mapping.
+        
+        :param shards: Number of shards for the index
+        :param replicas: Number of replicas for the index
+        '''
 
         mapping = {
             'settings': {
@@ -111,12 +136,28 @@ class ElasticIndexer:
             logging.warning(f'Index \'{self.es_index}\' already exists.')
 
 
-    def process_file(self, file_path: str, batch_size: int):
+    def get_cluster_health(self) -> dict:
+        '''Get the health of the Elasticsearch cluster.'''
+
+        return self.es.cluster.health()
+    
+
+    def get_cluster_size(self) -> int:
+        '''Get the number of nodes in the Elasticsearch cluster.'''
+
+        cluster_stats = self.es.cluster.stats()
+        number_of_nodes = cluster_stats['nodes']['count']['total']
+
+        return number_of_nodes
+
+
+    def process_file(self, file_path: str, watch: bool = False, chunk: dict = {}):
         '''
         Read and index DNS records in batches to Elasticsearch, handling large volumes efficiently.
 
-        :param file_path: Path to the DNS zone file
-        :param batch_size: Number of records to process before indexing
+        :param file_path: Path to the Masscan log file
+        :param watch: If True, input file will be watched for new lines and indexed in real time\
+        :param chunk: Chunk configuration for indexing in batches
 
         Example record:
         0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600    in  nsec3   1 1 100 332539EE7F95C32A 10MHUKG4FHIAVEFDOTF6NKU5KFCB2J3A NS DS RRSIG
@@ -192,17 +233,9 @@ class ElasticIndexer:
                             struct = {'_index': self.es_index, '_source': source}
                             records.append(struct)
                             count += 1
-                            if len(records) >= batch_size:
-                                while True:
-                                    try:
-                                        success, _ = helpers.bulk(self.es, records)
-                                    except (ConnectionError, TransportError) as e:
-                                        logging.error(f'Failed to index records to Elasticsearch. ({e})')
-                                        time.sleep(60)
-                                    else:
-                                        logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}')
-                                        records = []
-                                        break
+                            if len(records) >= chunk['batch']:
+                                self.bulk_index(records, file_path, chunk, count)
+                                records = []
 
                     last_domain = domain
 
@@ -214,28 +247,60 @@ class ElasticIndexer:
                 domain_records[domain][record_type].append({'ttl': ttl, 'data': data})
 
         if records:
-            while True:
-                try:
-                    success, _ = helpers.bulk(self.es, records)
-                except (ConnectionError, TransportError) as e:
-                    logging.error(f'Failed to index records to Elasticsearch. ({e})')
-                    time.sleep(60)
-                else:
-                    logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}')
+            self.bulk_index(records, file_path, chunk, count)
+
+
+    def bulk_index(self, documents: list, file_path: str, chunk: dict, count: int):
+        '''
+        Index a batch of documents to Elasticsearch.
+        
+        :param documents: List of documents to index
+        :param file_path: Path to the file being indexed
+        :param count: Total number of records processed
+        '''
+
+        remaining_documents = documents
+
+        parallel_bulk_config = {
+            'client': self.es,
+            'chunk_size': chunk['size'],
+            'max_chunk_bytes': chunk['max_size'] * 1024 * 1024, # MB
+            'thread_count': chunk['threads'],
+            'queue_size': 2
+        }
+
+        while remaining_documents:
+            failed_documents = []
+
+            try:
+                for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config):
+                    if not success:
+                        failed_documents.append(response)
+
+                if not failed_documents:
+                    ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count']
+                    logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}')
                     break
 
+                else:
+                    logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...')
+                    remaining_documents = failed_documents
+            except Exception as e:
+                logging.error(f'Failed to index documents! ({e})')
+                time.sleep(30)
+
 
 def main():
     '''Main function when running this script directly.'''
 
     parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
-    parser.add_argument('input_path', help='Path to the input file or directory')
-
+    
     # General arguments
+    parser.add_argument('input_path', help='Path to the input file or directory') # Required
     parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
-    parser.add_argument('--batch_size', type=int, default=50000, help='Number of records to index in a batch')
-
-    # Elasticsearch connection arguments
+    parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time')
+    
+    # Elasticsearch arguments
     parser.add_argument('--host', default='localhost', help='Elasticsearch host')
     parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
     parser.add_argument('--user', default='elastic', help='Elasticsearch username')
@@ -245,51 +310,90 @@ def main():
 
     # Elasticsearch indexing arguments
     parser.add_argument('--index', default='dns-zones', help='Elasticsearch index name')
-    parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index') # This depends on your cluster configuration
-    parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index') # This depends on your cluster configuration
+    parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
+    parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
+
+    # Performance arguments
+    parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch')
+    parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch')
+    parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches')
+    parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing')
+    parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch')
 
     args = parser.parse_args()
 
-    if not os.path.exists(args.input_path):
-        raise FileNotFoundError(f'Input file {args.input_path} does not exist')
+    # Argument validation
+    if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
+        raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
 
     if not args.dry_run:
         if args.batch_size < 1:
             raise ValueError('Batch size must be greater than 0')
+        
+        elif args.retries < 1:
+            raise ValueError('Number of retries must be greater than 0')
+        
+        elif args.timeout < 5:
+            raise ValueError('Timeout must be greater than 4')
+        
+        elif args.batch_max < 1:
+            raise ValueError('Batch max size must be greater than 0')
+        
+        elif args.batch_threads < 1:
+            raise ValueError('Batch threads must be greater than 0')
 
-        if not args.host:
+        elif not args.host:
             raise ValueError('Missing required Elasticsearch argument: host')
-        
-        if not args.api_key and (not args.user or not args.password):
+
+        elif not args.api_key and (not args.user or not args.password):
             raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
-        
-        if args.shards < 1:
+
+        elif args.shards < 1:
             raise ValueError('Number of shards must be greater than 0')
-        
-        if args.replicas < 1:
+
+        elif args.replicas < 0:
             raise ValueError('Number of replicas must be greater than 0')
+
+    edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout)
+    
+    if not args.dry_run:
+        print(edx.get_cluster_health())
         
-        logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...')
+        time.sleep(3) # Delay to allow time for sniffing to complete
 
-    edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed)
+        nodes = edx.get_cluster_size()
+
+        logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
 
-    if not args.dry_run:
         edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
 
+        chunk = {
+            'size': args.batch_size,
+            'max_size': args.batch_max * 1024 * 1024, # MB
+            'threads': args.batch_threads
+        }
+        
+        chunk['batch'] = nodes * (chunk['size'] * chunk['threads'])
+    else:
+        chunk = {} # Ugly hack to get this working...
+
     if os.path.isfile(args.input_path):
         logging.info(f'Processing file: {args.input_path}')
-        edx.process_file(args.input_path, args.batch_size)
+        edx.process_file(args.input_path, args.watch, chunk)
 
     elif os.path.isdir(args.input_path):
-        logging.info(f'Processing files in directory: {args.input_path}')
+        count = 1
+        total = len(os.listdir(args.input_path))
+        logging.info(f'Processing {total:,} files in directory: {args.input_path}')
         for file in sorted(os.listdir(args.input_path)):
             file_path = os.path.join(args.input_path, file)
             if os.path.isfile(file_path):
-                logging.info(f'Processing file: {file_path}')
-                edx.process_file(file_path, args.batch_size)
+                logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
+                edx.process_file(file_path, args.watch, chunk)
+                count += 1
+            else:
+                logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
 
-    else:
-        raise ValueError(f'Input path {args.input_path} is not a file or directory')
 
 
 if __name__ == '__main__':
diff --git a/ingestors/sniff_patch.py b/ingestors/sniff_patch.py
@@ -1,26 +1,41 @@
-# sniff_* patch for elastic 8 clients
-# Call init_elasticsearch() with normal Elasticsearch params
-# Only needed if you use sniff_* options and only works with basic auth, feel free to edit to your needs.
+#!/usr/bin/env python
+# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
+
+# Note:
+#   This is a patch for the elasticsearch 8.x client to fix the sniff_* options.
+#   This patch is only needed if you use the sniff_* options and only works with basic auth.
+#   Call init_elasticsearch() with normal Elasticsearch params.
+#
+# Source:
+#   - https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960
 
 import base64
+
 import elasticsearch._sync.client as client
 from elasticsearch.exceptions import SerializationError, ConnectionError
 
 
 def init_elasticsearch(*args, **kwargs):
+    '''
+    Initialize the Elasticsearch client with the sniff patch.
+    
+    :param args: Elasticsearch positional arguments.
+    :param kwargs: Elasticsearch keyword arguments.
+    '''
     client.default_sniff_callback = _override_sniff_callback(kwargs['basic_auth'])
+
     return client.Elasticsearch(*args, **kwargs)
 
-  
+
 def _override_sniff_callback(basic_auth):
-    """
+    '''
     Taken from https://github.com/elastic/elasticsearch-py/blob/8.8/elasticsearch/_sync/client/_base.py#L166
     Completely unmodified except for adding the auth header to the elastic request.
     Allows us to continue using the sniff_* options while this is broken in the library.
 
     TODO: Remove this when this issue is patched:
-    https://github.com/elastic/elasticsearch-py/issues/2005
-    """
+        - https://github.com/elastic/elasticsearch-py/issues/2005
+    '''
     auth_str = base64.b64encode(':'.join(basic_auth).encode()).decode()
     sniffed_node_callback = client._base._default_sniffed_node_callback
 
@@ -28,14 +43,13 @@ def _override_sniff_callback(basic_auth):
         for _ in transport.node_pool.all():
             try:
                 meta, node_infos = transport.perform_request(
-                    "GET",
-                    "/_nodes/_all/http",
-                    headers={
-                        "accept": "application/vnd.elasticsearch+json; compatible-with=8",
-                        # This auth header is missing in 8.x releases of the client, and causes 401s
-                        "authorization": f"Basic {auth_str}"
+                    'GET',
+                    '/_nodes/_all/http',
+                    headers = {
+                        'accept': 'application/vnd.elasticsearch+json; compatible-with=8',
+                        'authorization': f'Basic {auth_str}' # This auth header is missing in 8.x releases of the client, and causes 401s
                     },
-                    request_timeout=(
+                    request_timeout = (
                         sniff_options.sniff_timeout
                         if not sniff_options.is_initial_sniff
                         else None
@@ -48,19 +62,19 @@ def _override_sniff_callback(basic_auth):
                 continue
 
             node_configs = []
-            for node_info in node_infos.get("nodes", {}).values():
-                address = node_info.get("http", {}).get("publish_address")
-                if not address or ":" not in address:
+            for node_info in node_infos.get('nodes', {}).values():
+                address = node_info.get('http', {}).get('publish_address')
+                if not address or ':' not in address:
                     continue
 
-                if "/" in address:
+                if '/' in address:
                     # Support 7.x host/ip:port behavior where http.publish_host has been set.
-                    fqdn, ipaddress = address.split("/", 1)
+                    fqdn, ipaddress = address.split('/', 1)
                     host = fqdn
-                    _, port_str = ipaddress.rsplit(":", 1)
+                    _, port_str = ipaddress.rsplit(':', 1)
                     port = int(port_str)
                 else:
-                    host, port_str = address.rsplit(":", 1)
+                    host, port_str = address.rsplit(':', 1)
                     port = int(port_str)
 
                 assert sniffed_node_callback is not None