eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

commit c05c48f3fe35d9920327bfe41bd46b428e23610b
parent 2ff7ddc86940ac5e62cacd5fa5d15673464025c9
Author: acidvegas <acid.vegas@acid.vegas>
Date: Mon, 4 Mar 2024 17:44:09 -0500

Started asyncronous implementation of bulk streaming data, altered ERIS defaults, etc

Diffstat:
MREADME.md | 24++++++++++++++++++++++--
Aasync_dev/eris.py | 269+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aasync_dev/ingestors/ingest_masscan_async.py | 137+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aasync_dev/sniff_patch.py | 97+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Meris.py | 8++++----
Mhelpers/elastictop.py | 114+++++++++++++++++++++++++++++++++++++++----------------------------------------
Mhelpers/es_index_dump | 6+++---
Dhelpers/sniff_patch_async.py | 96-------------------------------------------------------------------------------
Mingestors/ingest_httpx.py | 18+++++++++---------
Mingestors/ingest_masscan.py | 55++++++++++++++++++++++++++++---------------------------
Mingestors/ingest_massdns.py | 2+-

11 files changed, 626 insertions(+), 200 deletions(-)

diff --git a/README.md b/README.md
@@ -56,16 +56,36 @@ python eris.py [options] <input>
 | `--massdns`       | Index massdns records  |
 | `--zone`          | Index zone DNS records |
 
-Using `--batch-threads` as 4 and `--batch-size` as 10000 with 3 nodes would process 120,000 records before indexing 40,000 per node. Take these kind of metrics int account when consider how much records you want to process at once and the memory limitations of your environment, aswell as the networking constraint it may have ono your node(s), depending on the size of your cluster.
+Using `--batch-threads` as 4 and `--batch-size` as 10000 with 3 nodes would process 120,000 records before indexing 40,000 per node. Take these kind of metrics into account when consider how much records you want to process at once and the memory limitations of your environment, aswell as the networking constraint it may have ono your node(s), depending on the size of your cluster.
 
-## Operations
 This ingestion suite will use the built in node sniffer, so by connecting to a single node, you can load balance across the entire cluster.
 It is good to know how much nodes you have in the cluster to determine how to fine tune the arguments for the best performance, based on your environment.
 
+## GeoIP Pipeline
+Create & add a geoip pipeline and use the following in your index mappings:
+
+```json
+"geoip": {
+    "city_name": "City",
+    "continent_name": "Continent",
+    "country_iso_code": "CC",
+    "country_name": "Country",
+    "location": {
+        "lat": 0.0000,
+        "lon": 0.0000
+    },
+    "region_iso_code": "RR",
+    "region_name": "Region"
+}
+```
+
 ## Changelog
 - The `--watch` feature now uses a FIFO to do live ingestion.
 - Isolated eris.py into it's own file and seperated the ingestion agents into their own modules.
 
+## Roadmap
+- Implement [async elasticsearch](https://elasticsearch-py.readthedocs.io/en/v8.12.1/async.html) into the code.
+
 ___
 
 ###### Mirrors for this repository: [acid.vegas](https://git.acid.vegas/eris) • [SuperNETs](https://git.supernets.org/acidvegas/eris) • [GitHub](https://github.com/acidvegas/eris) • [GitLab](https://gitlab.com/acidvegas/eris) • [Codeberg](https://codeberg.org/acidvegas/eris)
 \ No newline at end of file
diff --git a/async_dev/eris.py b/async_dev/eris.py
@@ -0,0 +1,268 @@
+#!/usr/bin/env python
+# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
+# eris.py [asyncronous developement]
+
+import argparse
+import logging
+import os
+import stat
+import time
+import sys
+
+sys.dont_write_bytecode = True
+
+try:
+    from elasticsearch import AsyncElasticsearch
+    from elasticsearch.exceptions import NotFoundError
+    from elasticsearch.helpers import async_streaming_bulk
+except ImportError:
+    raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
+
+# Setting up logging
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
+
+
+class ElasticIndexer:
+    def __init__(self, args: argparse.Namespace):
+        '''
+        Initialize the Elastic Search indexer.
+
+        :param args: Parsed arguments from argparse
+        '''
+
+        self.chunk_size = args.chunk_size
+        self.chunk_threads = args.chunk_threads
+        self.dry_run = args.dry_run
+        self.es_index = args.index
+
+        if not args.dry_run:
+            es_config = {
+                'hosts': [f'{args.host}:{args.port}'],
+                'verify_certs': args.self_signed,
+                'ssl_show_warn': args.self_signed,
+                'request_timeout': args.timeout,
+                'max_retries': args.retries,
+                'retry_on_timeout': True,
+                'sniff_on_start': True, # Is this problematic? 
+                'sniff_on_node_failure': True,
+                'min_delay_between_sniffing': 60 # Add config option for this?
+            }
+
+            if args.api_key:
+                es_config['api_key'] = (args.key, '') # Verify this is correct
+            else:
+                es_config['basic_auth'] = (args.user, args.password)
+                
+            # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
+            import sniff_patch
+            self.es = sniff_patch.init_elasticsearch(**es_config)
+
+            # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
+            #self.es = AsyncElasticsearch(**es_config)
+
+
+    async def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1, ):
+        '''
+        Create the Elasticsearch index with the defined mapping.
+        
+        :param pipline: Name of the ingest pipeline to use for the index
+        :param replicas: Number of replicas for the index
+        :param shards: Number of shards for the index
+        '''
+
+        if await self.es.indices.exists(index=self.es_index):
+            logging.info(f'Index \'{self.es_index}\' already exists.')
+            return
+
+        mapping = map_body
+
+        mapping['settings'] = {
+            'number_of_shards': shards,
+            'number_of_replicas': replicas
+        }
+
+        if pipeline:
+            try:
+                await self.es.ingest.get_pipeline(id=pipeline)
+                logging.info(f'Using ingest pipeline \'{pipeline}\' for index \'{self.es_index}\'')
+                mapping['settings']['index.default_pipeline'] = pipeline
+            except NotFoundError:
+                raise ValueError(f'Ingest pipeline \'{pipeline}\' does not exist.')
+
+        response = await self.es.indices.create(index=self.es_index, body=mapping)
+
+        if response.get('acknowledged') and response.get('shards_acknowledged'):
+            logging.info(f'Index \'{self.es_index}\' successfully created.')
+        else:
+            raise Exception(f'Failed to create index. ({response})')
+
+
+    async def get_cluster_health(self) -> dict:
+        '''Get the health of the Elasticsearch cluster.'''
+
+        return await self.es.cluster.health()
+    
+
+    async def get_cluster_size(self) -> int:
+        '''Get the number of nodes in the Elasticsearch cluster.'''
+
+        cluster_stats = await self.es.cluster.stats()
+        number_of_nodes = cluster_stats['nodes']['count']['total']
+
+        return number_of_nodes
+
+
+    async def async_bulk_index_data(self, file_path: str, index_name: str, data_generator: callable):
+        '''
+        Index records in chunks to Elasticsearch.
+
+        :param file_path: Path to the file
+        :param index_name: Name of the index
+        :param data_generator: Generator for the records to index
+        '''
+
+        count = 0
+        total = 0
+        
+        async for ok, result in async_streaming_bulk(self.es, index_name=self.es_index, actions=data_generator(file_path), chunk_size=self.chunk_size):
+            action, result = result.popitem()
+
+            if not ok:
+                logging.error(f'Failed to index document ({result["_id"]}) to {index_name} from {file_path} ({result})')
+                input('Press Enter to continue...') # Debugging (will possibly remove this since we have retries enabled)
+                continue
+
+            count += 1
+            total += 1
+
+            if count == self.chunk_size:
+                logging.info(f'Successfully indexed {self.chunk_size:,} ({total:,} processed) records to {self.es_index} from {file_path}')
+                count = 0
+
+        logging.info(f'Finished indexing {self.total:,} records to {self.es_index} from {file_path}')
+
+
+    async def process_file(self, file_path: str, ingest_function: callable):
+        '''
+        Read and index records in batches to Elasticsearch.
+
+        :param file_path: Path to the file
+        :param batch_size: Number of records to index per batch
+        :param ingest_function: Function to process the file
+        '''
+
+        count = 0
+
+        async for processed in ingest_function(file_path):
+            if not processed:
+                break
+
+            if self.dry_run:
+                print(processed)
+                continue
+
+            count += 1
+
+            yield {'_index': self.es_index, '_source': processed}
+
+
+def main():
+    '''Main function when running this script directly.'''
+
+    parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
+    
+    # General arguments
+    parser.add_argument('input_path', help='Path to the input file or directory') # Required
+    parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
+    parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
+    
+    # Elasticsearch arguments
+    parser.add_argument('--host', default='localhost', help='Elasticsearch host')
+    parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
+    parser.add_argument('--user', default='elastic', help='Elasticsearch username')
+    parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
+    parser.add_argument('--api-key', default=os.getenv('ES_APIKEY'), help='Elasticsearch API Key for authentication (if not provided, check environment variable ES_APIKEY)')
+    parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
+
+    # Elasticsearch indexing arguments
+    parser.add_argument('--index', help='Elasticsearch index name')
+    parser.add_argument('--pipeline', help='Use an ingest pipeline for the index')
+    parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
+    parser.add_argument('--shards', type=int, default=3, help='Number of shards for the index')
+    
+    # Performance arguments
+    parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk')
+    parser.add_argument('--chunk-threads', type=int, default=3, help='Number of threads to use when indexing in chunks')
+    parser.add_argument('--retries', type=int, default=60, help='Number of times to retry indexing a chunk before failing')
+    parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a chunk')
+
+    # Ingestion arguments
+    parser.add_argument('--httpx', action='store_true', help='Index Httpx records')
+    parser.add_argument('--masscan', action='store_true', help='Index Masscan records')
+    parser.add_argument('--massdns', action='store_true', help='Index Massdns records')
+    parser.add_argument('--zone', action='store_true', help='Index Zone records')
+
+    args = parser.parse_args()
+
+    if args.watch:
+        if not os.path.exists(args.input_path):
+            os.mkfifo(args.input_path)
+        elif os.path.exists(args.input_path) and stat.S_ISFIFO(os.stat(args.input_path).st_mode):
+            raise ValueError(f'Path {args.input_path} is not a FIFO')
+    elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
+        raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
+
+    edx = ElasticIndexer(args)
+
+    if args.httpx:
+        from ingestors import ingest_httpx   as ingestor
+    elif args.masscan:
+        from ingestors import ingest_masscan as ingestor
+    elif args.massdns:
+        from ingestors import ingest_massdns as ingestor
+    elif args.zone:
+        from ingestors import ingest_zone    as ingestor
+
+    batch_size = 0
+    
+    if not args.dry_run:
+        print(edx.get_cluster_health())
+
+        time.sleep(3) # Delay to allow time for sniffing to complete
+        
+        nodes = edx.get_cluster_size()
+        logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
+
+        if not edx.es_index:
+            edx.es_index = ingestor.default_index
+
+        map_body = ingestor.construct_map()
+        edx.create_index(map_body, args.pipeline, args.replicas, args.shards)
+
+        batch_size = int(nodes * (args.chunk_size * args.chunk_threads))
+    
+    if os.path.isfile(args.input_path):
+        logging.info(f'Processing file: {args.input_path}')
+        edx.process_file(args.input_path, batch_size, ingestor.process_file)
+
+    elif stat.S_ISFIFO(os.stat(args.input_path).st_mode):
+        logging.info(f'Watching FIFO: {args.input_path}')
+        edx.process_file(args.input_path, batch_size, ingestor.process_file)
+
+    elif os.path.isdir(args.input_path):
+        count = 1
+        total = len(os.listdir(args.input_path))
+        logging.info(f'Processing {total:,} files in directory: {args.input_path}')
+        for file in sorted(os.listdir(args.input_path)):
+            file_path = os.path.join(args.input_path, file)
+            if os.path.isfile(file_path):
+                logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
+                edx.process_file(file_path, batch_size, ingestor.process_file)
+                count += 1
+            else:
+                logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
+
+
+
+if __name__ == '__main__':
+    main()
+\ No newline at end of file
diff --git a/async_dev/ingestors/ingest_masscan_async.py b/async_dev/ingestors/ingest_masscan_async.py
@@ -0,0 +1,136 @@
+#!/usr/bin/env python
+# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
+# ingest_masscan.py [asyncronous developement]
+
+'''
+apt-get install iptables masscan libpcap-dev screen
+setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan
+/sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP
+printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32"  > exclude.conf
+screen -S scan
+masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ output.json
+masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL
+
+Note: The above iptables rule is not persistent and will be removed on reboot.
+'''
+
+import json
+import logging
+import re
+import time
+
+default_index = 'masscan-logs'
+
+def construct_map() -> dict:
+    '''Construct the Elasticsearch index mapping for Masscan records.'''
+
+    keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
+
+    mapping = {
+        'mappings': {
+            'properties': {
+                'ip':      { 'type': 'ip' },
+                'port':    { 'type': 'integer' },
+                'proto':   { 'type': 'keyword' },
+                'service': { 'type': 'keyword' },
+                'banner':  keyword_mapping,
+                'ref_id':  { 'type': 'keyword' },
+                'seen':    { 'type': 'date' }
+                #'geoip':   {
+                #    'properties': {
+                #        'city_name':        keyword_mapping,
+                #        'continent_name':   keyword_mapping,
+                #        'country_iso_code': keyword_mapping,
+                #        'country_name':     keyword_mapping,
+                #        'location':         { 'type': 'geo_point' },
+                #        'region_iso_code':  keyword_mapping,
+                #        'region_name':      keyword_mapping,
+                #    }
+                #}
+            }
+        }
+    }
+
+    return mapping
+
+
+def process_file(file_path: str):
+    '''
+    Read and process Masscan records from the log file.
+
+    :param file_path: Path to the Masscan log file
+    '''
+
+    with open(file_path, 'r') as file:
+        for line in file:
+            line = line.strip()
+
+            if not line or not line.startswith('{'):
+                continue
+
+            if line.endswith(','):
+                line = line[:-1]
+
+            try:
+                record = json.loads(line)
+            except json.decoder.JSONDecodeError:
+                logging.error(f'Failed to parse JSON record! ({line})')
+                input('Press Enter to continue...') # Debugging
+                continue
+
+            for port_info in record['ports']:
+                struct = {
+                    'ip': record['ip'],
+                    'port': port_info['port'],
+                    'proto': port_info['proto'],
+                    'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
+                }
+
+                if 'service' in port_info:
+                    if 'name' in port_info['service']:
+                        if port_info['service']['name'] != 'unknown':
+                            struct['service'] = port_info['service']['name']
+
+                    if 'banner' in port_info['service']:
+                        banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace
+                        if banner:
+                            match = re.search(r'\(Ref\.Id: (.*?)\)', banner)
+                            if match:
+                                struct['ref_id'] = match.group(1)
+                            else:
+                                struct['banner'] = banner
+
+                yield {'_index': default_index, '_source': struct}
+ 
+    return None # EOF
+
+
+
+'''
+Example record:
+{
+    "ip": "43.134.51.142",
+    "timestamp": "1705255468", # Convert to ZULU BABY
+    "ports": [ # We will create a record for each port opened
+        {
+            "port": 22,
+            "proto": "tcp",
+            "service": { # This field is optional
+                "name": "ssh",
+                "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
+            }
+        }
+    ]
+}
+
+Will be indexed as:
+{
+    "ip": "43.134.51.142",
+    "port": 22,
+    "proto": "tcp",
+    "service": "ssh",
+    "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
+    "seen": "2021-10-08T02:04:28Z",
+    "ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # TCP RST Payload, Might be useful..
+}
+'''
+\ No newline at end of file
diff --git a/async_dev/sniff_patch.py b/async_dev/sniff_patch.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
+# sniff_patch.py [asyncronous developement]
+
+# Note:
+#   This is a patch for the elasticsearch 8.x client to fix the sniff_* options.
+#   This patch is only needed if you use the sniff_* options and only works with basic auth.
+#   Call init_elasticsearch() with normal Elasticsearch params.
+#
+# Source:
+#   - https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960
+
+import base64
+
+import elasticsearch._async.client as async_client
+from elasticsearch.exceptions import SerializationError, ConnectionError
+
+
+async def init_elasticsearch_async(*args, **kwargs):
+    '''
+    Initialize the Async Elasticsearch client with the sniff patch.
+    
+    :param args: Async Elasticsearch positional arguments.
+    :param kwargs: Async Elasticsearch keyword arguments.
+    '''
+    async_client.default_sniff_callback = _override_async_sniff_callback(kwargs['basic_auth'])
+
+    return async_client.AsyncElasticsearch(*args, **kwargs)
+
+
+def _override_async_sniff_callback(basic_auth):
+    '''
+    Taken from https://github.com/elastic/elasticsearch-py/blob/8.8/elasticsearch/_sync/client/_base.py#L166
+    Completely unmodified except for adding the auth header to the elastic request.
+    Allows us to continue using the sniff_* options while this is broken in the library.
+
+    TODO: Remove this when this issue is patched:
+        - https://github.com/elastic/elasticsearch-py/issues/2005
+    '''
+    auth_str = base64.b64encode(':'.join(basic_auth).encode()).decode()
+    sniffed_node_callback = async_client._base._default_sniffed_node_callback
+
+    async def modified_async_sniff_callback(transport, sniff_options):
+        for _ in transport.node_pool.all():
+            try:
+                meta, node_infos = await transport.perform_request(
+                    'GET',
+                    '/_nodes/_all/http',
+                    headers={
+                        'accept': 'application/vnd.elasticsearch+json; compatible-with=8',
+                        'authorization': f'Basic {auth_str}'  # This auth header is missing in 8.x releases of the client, and causes 401s
+                    },
+                    request_timeout=(
+                        sniff_options.sniff_timeout
+                        if not sniff_options.is_initial_sniff
+                        else None
+                    ),
+                )
+            except (SerializationError, ConnectionError):
+                continue
+
+            if not 200 <= meta.status <= 299:
+                continue
+
+            node_configs = []
+            for node_info in node_infos.get('nodes', {}).values():
+                address = node_info.get('http', {}).get('publish_address')
+                if not address or ':' not in address:
+                    continue
+
+                if '/' in address:
+                    # Support 7.x host/ip:port behavior where http.publish_host has been set.
+                    fqdn, ipaddress = address.split('/', 1)
+                    host = fqdn
+                    _, port_str = ipaddress.rsplit(':', 1)
+                    port = int(port_str)
+                else:
+                    host, port_str = address.rsplit(':', 1)
+                    port = int(port_str)
+
+                assert sniffed_node_callback is not None
+                sniffed_node = await sniffed_node_callback(
+                    node_info, meta.node.replace(host=host, port=port)
+                )
+                if sniffed_node is None:
+                    continue
+
+                # Use the node which was able to make the request as a base.
+                node_configs.append(sniffed_node)
+
+            if node_configs:
+                return node_configs
+
+        return []
+
+    return modified_async_sniff_callback
+\ No newline at end of file
diff --git a/eris.py b/eris.py
@@ -206,13 +206,13 @@ def main():
     parser.add_argument('--index', help='Elasticsearch index name')
     parser.add_argument('--pipeline', help='Use an ingest pipeline for the index')
     parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
-    parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
+    parser.add_argument('--shards', type=int, default=3, help='Number of shards for the index')
     
     # Performance arguments
     parser.add_argument('--chunk-max', type=int, default=10, help='Maximum size in MB of a chunk')
-    parser.add_argument('--chunk-size', type=int, default=5000, help='Number of records to index in a chunk')
-    parser.add_argument('--chunk-threads', type=int, default=2, help='Number of threads to use when indexing in chunks')
-    parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a chunk before failing')
+    parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk')
+    parser.add_argument('--chunk-threads', type=int, default=3, help='Number of threads to use when indexing in chunks')
+    parser.add_argument('--retries', type=int, default=60, help='Number of times to retry indexing a chunk before failing')
     parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a chunk')
 
     # Ingestion arguments
diff --git a/helpers/elastictop.py b/helpers/elastictop.py
@@ -22,7 +22,6 @@ def bytes_to_human_readable(num_bytes):
         num_bytes /= 1024.0
     return f"{num_bytes:.1f}YB"
 
-
 def main():
     '''Main function when running this script directly.'''
 
@@ -43,63 +42,62 @@ def main():
     }
     es = Elasticsearch(**es_config)
 
-    stats = es.cluster.stats()
-
-    name = stats['cluster_name']
-    status = stats['status']
-    indices = {
-        'total': stats['indices']['count'],
-        'shards': stats['indices']['shards']['total'],
-        'docs': stats['indices']['docs']['count'],
-        'size': bytes_to_human_readable(stats['indices']['store']['size_in_bytes'])
-    }
-    nodes = {
-        'total': stats['_nodes']['total'],
-        'successful': stats['_nodes']['successful'],
-        'failed': stats['_nodes']['failed']
-    }
-
-    if status == 'green':
-        print(f'Cluster   {name} (\033[92m{status}\033[0m)')
-    elif status == 'yellow':
-        print(f'Cluster   {name} (\033[93m{status}\033[0m)')
-    elif status == 'red':
-        print(f'Cluster   {name} (\033[91m{status}\033[0m)')
-    
-    print('')
-    print(f'Nodes     {nodes["total"]} Total, {nodes["successful"]} Successful, {nodes["failed"]} Failed')
-
-    nodes_info = es.nodes.info()
-    # Loop through each node and print details
-    for node_id, node_info in nodes_info['nodes'].items():
-        node_name = node_info['name']
-        transport_address = node_info['transport_address']
-        #node_stats = es.nodes.stats(node_id=node_id)
-        version = node_info['version']
-        memory = bytes_to_human_readable(int(node_info['settings']['node']['attr']['ml']['machine_memory']))
-        print(f"          {node_name.ljust(7)} | Host: {transport_address.rjust(21)} | Version: {version.ljust(7)} | Processors: {node_info['os']['available_processors']} | Memory: {memory}")
-
-    indices_stats = es.cat.indices(format="json")
-
-    #print('          |')
-    print('')
-    
-    print(f'Indices   {indices["total"]:,} Total {indices["shards"]:,}, Shards')
-    for index in indices_stats:
-        index_name = index['index']
-        document_count = f'{int(index['docs.count']):,}'
-        store_size = index['store.size']
-        number_of_shards = int(index['pri'])  # primary shards
-        number_of_replicas = int(index['rep'])  # replicas
-
-        if index_name.startswith('.') or document_count == '0':
-            continue
-
-        print(f"          {index_name.ljust(15)} | Documents: {document_count.rjust(15)} | {store_size.rjust(7)} [Shards: {number_of_shards:,}, Replicas: {number_of_replicas:,}]")
-
-    dox = f'{indices["docs"]:,}'
-    print('')
-    print(f'Total {dox.rjust(48)} {indices["size"].rjust(9)}')
+    while True:
+        os.system('clear')
+
+        stats = es.cluster.stats()
+
+        name = stats['cluster_name']
+        status = stats['status']
+        indices = {
+            'total': stats['indices']['count'],
+            'shards': stats['indices']['shards']['total'],
+            'docs': stats['indices']['docs']['count'],
+            'size': bytes_to_human_readable(stats['indices']['store']['size_in_bytes'])
+        }
+        nodes = {
+            'total': stats['_nodes']['total'],
+            'successful': stats['_nodes']['successful'],
+            'failed': stats['_nodes']['failed']
+        }
+
+        if status == 'green':
+            print(f'Cluster   {name} (\033[92m{status}\033[0m)')
+        elif status == 'yellow':
+            print(f'Cluster   {name} (\033[93m{status}\033[0m)')
+        elif status == 'red':
+            print(f'Cluster   {name} (\033[91m{status}\033[0m)')
+        
+        print(f'\nNodes     {nodes["total"]} Total, {nodes["successful"]} Successful, {nodes["failed"]} Failed')
+
+        nodes_info = es.nodes.info()
+
+        for node_id, node_info in nodes_info['nodes'].items():
+            node_name = node_info['name']
+            transport_address = node_info['transport_address']
+            #node_stats = es.nodes.stats(node_id=node_id)
+            version = node_info['version']
+            memory = bytes_to_human_readable(int(node_info['settings']['node']['attr']['ml']['machine_memory']))
+            print(f"          {node_name.ljust(7)} | Host: {transport_address.rjust(21)} | Version: {version.ljust(7)} | Processors: {node_info['os']['available_processors']} | Memory: {memory}")
+
+        indices_stats = es.cat.indices(format="json")
+        
+        print(f'\nIndices   {indices["total"]:,} Total {indices["shards"]:,}, Shards')
+        for index in indices_stats:
+            index_name = index['index']
+            document_count = f'{int(index['docs.count']):,}'
+            store_size = index['store.size']
+            number_of_shards = int(index['pri'])  # primary shards
+            number_of_replicas = int(index['rep'])  # replicas
+
+            if index_name.startswith('.') or document_count == '0':
+                continue
+
+            print(f"          {index_name.ljust(15)} | Documents: {document_count.rjust(15)} | {store_size.rjust(7)} [Shards: {number_of_shards:,}, Replicas: {number_of_replicas:,}]")
+
+        dox = f'{indices["docs"]:,}'
+
+        print(f'\nTotal {dox.rjust(48)} {indices["size"].rjust(9)}')
 
 if __name__ == '__main__':
     main()
diff --git a/helpers/es_index_dump b/helpers/es_index_dump
@@ -11,13 +11,13 @@ BATCH_SIZE=10000
 ES_HOST="https://elastic.change.me:9200"
 ES_INDEX="juicy_booties"
 
-SCROLL_ID=$(curl -s -XGET "$ES_HOST/$ES_INDEX/_search?scroll=1m" -H 'Content-Type: application/json' -d'{ "size": $BATCH_SIZE, "query": { "match_all": {} } }' | jq -r '._scroll_id')
+SCROLL_ID=$(curl -s -XGET "$ES_HOST/$ES_INDEX/_search?scroll=1m" -H 'Content-Type: application/json' -d"{ \"size\": $BATCH_SIZE, \"query\": { \"match_all\": {} } }" | jq -r '._scroll_id')
 
 count=0
 
 while true; do
-	RESPONSE=$(curl -s -XGET "$ES_HOST/_search/scroll" -H 'Content-Type: application/json' -d'{"scroll": "1m", "scroll_id": "'$SCROLL_ID'"}')
-	
+	RESPONSE=$(curl -s -XGET "$ES_HOST/_search/scroll" -H 'Content-Type: application/json' -d"{\"scroll\": \"1m\", \"scroll_id\": \"$SCROLL_ID\"}")
+
 	HITS=$(echo $RESPONSE | jq -c '.hits.hits[]')
 
 	if [ -z "$HITS" ] || [ "$HITS" = "null" ]; then
diff --git a/helpers/sniff_patch_async.py b/helpers/sniff_patch_async.py
@@ -1,95 +0,0 @@
-#!/usr/bin/env python
-# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
-# sniff_patch.py
-
-# Note:
-#   This is a patch for the elasticsearch 8.x client to fix the sniff_* options.
-#   This patch is only needed if you use the sniff_* options and only works with basic auth.
-#   Call init_elasticsearch() with normal Elasticsearch params.
-#
-# Source:
-#   - https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960
-
-import base64
-
-import elasticsearch._async.client as async_client
-from elasticsearch.exceptions import SerializationError, ConnectionError
-
-
-async def init_elasticsearch_async(*args, **kwargs):
-    '''
-    Initialize the Async Elasticsearch client with the sniff patch.
-    
-    :param args: Async Elasticsearch positional arguments.
-    :param kwargs: Async Elasticsearch keyword arguments.
-    '''
-    async_client.default_sniff_callback = _override_async_sniff_callback(kwargs['basic_auth'])
-
-    return async_client.AsyncElasticsearch(*args, **kwargs)
-
-
-def _override_async_sniff_callback(basic_auth):
-    '''
-    Taken from https://github.com/elastic/elasticsearch-py/blob/8.8/elasticsearch/_sync/client/_base.py#L166
-    Completely unmodified except for adding the auth header to the elastic request.
-    Allows us to continue using the sniff_* options while this is broken in the library.
-
-    TODO: Remove this when this issue is patched:
-        - https://github.com/elastic/elasticsearch-py/issues/2005
-    '''
-    auth_str = base64.b64encode(':'.join(basic_auth).encode()).decode()
-    sniffed_node_callback = async_client._base._default_sniffed_node_callback
-
-    async def modified_async_sniff_callback(transport, sniff_options):
-        for _ in transport.node_pool.all():
-            try:
-                meta, node_infos = await transport.perform_request(
-                    'GET',
-                    '/_nodes/_all/http',
-                    headers={
-                        'accept': 'application/vnd.elasticsearch+json; compatible-with=8',
-                        'authorization': f'Basic {auth_str}'  # Authorization header
-                    },
-                    request_timeout=(
-                        sniff_options.sniff_timeout
-                        if not sniff_options.is_initial_sniff
-                        else None
-                    ),
-                )
-            except (SerializationError, ConnectionError):
-                continue
-
-            if not 200 <= meta.status <= 299:
-                continue
-
-            node_configs = []
-            for node_info in node_infos.get('nodes', {}).values():
-                address = node_info.get('http', {}).get('publish_address')
-                if not address or ':' not in address:
-                    continue
-
-                # Processing address for host and port
-                if '/' in address:
-                    fqdn, ipaddress = address.split('/', 1)
-                    host = fqdn
-                    _, port_str = ipaddress.rsplit(':', 1)
-                    port = int(port_str)
-                else:
-                    host, port_str = address.rsplit(':', 1)
-                    port = int(port_str)
-
-                assert sniffed_node_callback is not None
-                sniffed_node = await sniffed_node_callback(
-                    node_info, meta.node.replace(host=host, port=port)
-                )
-                if sniffed_node is None:
-                    continue
-
-                node_configs.append(sniffed_node)
-
-            if node_configs:
-                return node_configs
-
-        return []
-
-    return modified_async_sniff_callback
-\ No newline at end of file
diff --git a/ingestors/ingest_httpx.py b/ingestors/ingest_httpx.py
@@ -53,7 +53,7 @@ def process_file(file_path: str):
 Example record:
 {
     "timestamp":"2024-01-14T13:08:15.117348474-05:00", # Rename to seen and remove milliseconds and offset
-    "hash": {
+    "hash": { # Do we need all of these ?
         "body_md5":"4ae9394eb98233b482508cbda3b33a66",
         "body_mmh3":"-4111954",
         "body_sha256":"89e06e8374353469c65adb227b158b265641b424fba7ddb2c67eef0c4c1280d3",
@@ -64,19 +64,19 @@ Example record:
         "header_simhash":"10962523587435277678"
     },
     "port":"443",
-    "url":"https://supernets.org",
+    "url":"https://supernets.org", # Remove this and only use the input field as "domain" maybe
     "input":"supernets.org", # rename to domain
     "title":"SuperNETs",
     "scheme":"https",
     "webserver":"nginx",
     "body_preview":"SUPERNETS Home About Contact Donate Docs Network IRC Git Invidious Jitsi LibreX Mastodon Matrix Sup",
     "content_type":"text/html",
-    "method":"GET",
+    "method":"GET", # Do we need this ?
     "host":"51.89.151.158",
     "path":"/",
     "favicon":"-674048714",
     "favicon_path":"/i/favicon.png",
-    "time":"592.907689ms",
+    "time":"592.907689ms", # Do we need this ?
     "a":[
         "6.150.220.23"
     ],
@@ -85,12 +85,12 @@ Example record:
         "HSTS",
         "Nginx"
     ],
-    "words":436,
-    "lines":79,
-    "status_code":200,
+    "words":436, # Do we need this ?
+    "lines":79, # Do we need this ?
+    "status_code":200, 
     "content_length":4597,
-    "failed":false,
-    "knowledgebase":{
+    "failed":false, # Do we need this ?
+    "knowledgebase":{ # Do we need this ?
         "PageType":"nonerror",
         "pHash":0
     }
diff --git a/ingestors/ingest_masscan.py b/ingestors/ingest_masscan.py
@@ -2,6 +2,18 @@
 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
 # ingest_masscan.py
 
+'''
+apt-get install iptables masscan libpcap-dev screen
+setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan
+/sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP
+printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32"  > exclude.conf
+screen -S scan
+masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ output.json
+masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL
+
+Note: The above iptables rule is not persistent and will be removed on reboot.
+'''
+
 import json
 import logging
 import re
@@ -23,18 +35,18 @@ def construct_map() -> dict:
                 'service': { 'type': 'keyword' },
                 'banner':  keyword_mapping,
                 'ref_id':  { 'type': 'keyword' },
-                'seen':    { 'type': 'date' },
-                'geoip':   {
-                    'properties': {
-                        'city_name':        keyword_mapping,
-                        'continent_name':   keyword_mapping,
-                        'country_iso_code': keyword_mapping,
-                        'country_name':     keyword_mapping,
-                        'location':         { 'type': 'geo_point' },
-                        'region_iso_code':  keyword_mapping,
-                        'region_name':      keyword_mapping,
-                    }
-                }
+                'seen':    { 'type': 'date' }
+                #'geoip':   {
+                #    'properties': {
+                #        'city_name':        keyword_mapping,
+                #        'continent_name':   keyword_mapping,
+                #        'country_iso_code': keyword_mapping,
+                #        'country_name':     keyword_mapping,
+                #        'location':         { 'type': 'geo_point' },
+                #        'region_iso_code':  keyword_mapping,
+                #        'region_name':      keyword_mapping,
+                #    }
+                #}
             }
         }
     }
@@ -56,6 +68,9 @@ def process_file(file_path: str):
             if not line or not line.startswith('{'):
                 continue
 
+            if line.endswith(','):
+                line = line[:-1]
+
             try:
                 record = json.loads(line)
             except json.decoder.JSONDecodeError:
@@ -116,20 +131,6 @@ Will be indexed as:
     "service": "ssh",
     "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
     "seen": "2021-10-08T02:04:28Z",
-    "ref_id": "?sKfOvsC4M4a2W8PaC4zF?", # TCP RST Payload, Might be useful..
-
-    # GeoIP ingestion pipeline fields
-    "geoip": {
-        "city_name": "City",
-        "continent_name": "Continent",
-        "country_iso_code": "CC",
-        "country_name": "Country",
-        "location": {
-            "lat": 0.0000,
-            "lon": 0.0000
-        },
-        "region_iso_code": "RR",
-        "region_name": "Region"
-    }
+    "ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # TCP RST Payload (Do we need this?)
 }
 '''
 \ No newline at end of file
diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py
@@ -56,7 +56,7 @@ def process_file(file_path: str):
             # Let's not index the PTR record if it's the same as the in-addr.arpa domain
             if data == name:
                 continue
-
+                    
             ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
             
             struct = {