eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

commit b6fb68ba3a060d715b966f163cfefc5a75531b61
parent 31287a99d23c196dc20b6c1889de02a25e3f969d
Author: acidvegas <acid.vegas@acid.vegas>
Date: Tue, 5 Mar 2024 16:47:11 -0500

Asyncronous developed mon ERIS is complete, need to refactor ingestion helpers before pushing this as the main version

Diffstat:
MREADME.md | 1+
Masync_dev/eris.py | 130++++++++++++++++++++++++++++++++-----------------------------------------------
Masync_dev/ingestors/ingest_certs.py | 9+++++++--
Aasync_dev/ingestors/ingest_masscan.py | 149+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Dasync_dev/ingestors/ingest_masscan_async.py | 142-------------------------------------------------------------------------------

5 files changed, 209 insertions(+), 222 deletions(-)

diff --git a/README.md b/README.md
@@ -86,6 +86,7 @@ Create & add a geoip pipeline and use the following in your index mappings:
 ## Roadmap
 - Implement [async elasticsearch](https://elasticsearch-py.readthedocs.io/en/v8.12.1/async.html) into the code.
 - WHOIS database ingestion scripts
+- Dynamically update the batch metrics when the sniffer adds or removes nodes
 
 ___
 
diff --git a/async_dev/eris.py b/async_dev/eris.py
@@ -2,11 +2,11 @@
 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
 # eris.py [asyncronous developement]
 
+import asyncio
 import argparse
 import logging
 import os
 import stat
-import time
 import sys
 
 sys.dont_write_bytecode = True
@@ -31,41 +31,44 @@ class ElasticIndexer:
         '''
 
         self.chunk_size = args.chunk_size
-        self.chunk_threads = args.chunk_threads
-        self.dry_run = args.dry_run
+        self.es = None
         self.es_index = args.index
+        
+        self.es_config = {
+            'hosts': [f'{args.host}:{args.port}'],
+            'verify_certs': args.self_signed,
+            'ssl_show_warn': args.self_signed,
+            'request_timeout': args.timeout,
+            'max_retries': args.retries,
+            'retry_on_timeout': True,
+            'sniff_on_start': True, # Is this problematic? 
+            'sniff_on_node_failure': True,
+            'min_delay_between_sniffing': 60 # Add config option for this?
+        }
 
-        if not args.dry_run:
-            es_config = {
-                'hosts': [f'{args.host}:{args.port}'],
-                'verify_certs': args.self_signed,
-                'ssl_show_warn': args.self_signed,
-                'request_timeout': args.timeout,
-                'max_retries': args.retries,
-                'retry_on_timeout': True,
-                'sniff_on_start': True, # Is this problematic? 
-                'sniff_on_node_failure': True,
-                'min_delay_between_sniffing': 60 # Add config option for this?
-            }
-
-            if args.api_key:
-                es_config['api_key'] = (args.key, '') # Verify this is correct
-            else:
-                es_config['basic_auth'] = (args.user, args.password)
-                
-            # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
-            import sniff_patch
-            self.es = sniff_patch.init_elasticsearch(**es_config)
+        if args.api_key:
+            self.es_config['api_key'] = (args.api_key, '') # Verify this is correct
+        else:
+            self.es_config['basic_auth'] = (args.user, args.password)
+            
+        
+    async def initialize(self):
+        '''Initialize the Elasticsearch client.'''
+
+        # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
+        import sniff_patch
+        self.es = sniff_patch.init_elasticsearch(**self.es_config)
 
-            # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
-            #self.es = AsyncElasticsearch(**es_config)
+        # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
+        #self.es = AsyncElasticsearch(**es_config)
 
 
-    async def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1, ):
+    async def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1):
         '''
         Create the Elasticsearch index with the defined mapping.
         
-        :param pipline: Name of the ingest pipeline to use for the index
+        :param map_body: Mapping for the index
+        :param pipeline: Name of the ingest pipeline to use for the index
         :param replicas: Number of replicas for the index
         :param shards: Number of shards for the index
         '''
@@ -112,7 +115,7 @@ class ElasticIndexer:
         return number_of_nodes
 
 
-    async def async_bulk_index_data(self, file_path: str, index_name: str, data_generator: callable):
+    async def process_data(self, file_path: str, data_generator: callable):
         '''
         Index records in chunks to Elasticsearch.
 
@@ -124,11 +127,11 @@ class ElasticIndexer:
         count = 0
         total = 0
         
-        async for ok, result in async_streaming_bulk(self.es, index_name=self.es_index, actions=data_generator(file_path), chunk_size=self.chunk_size):
+        async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size):
             action, result = result.popitem()
 
             if not ok:
-                logging.error(f'Failed to index document ({result["_id"]}) to {index_name} from {file_path} ({result})')
+                logging.error(f'Failed to index document ({result["_id"]}) to {self.es_index} from {file_path} ({result})')
                 input('Press Enter to continue...') # Debugging (will possibly remove this since we have retries enabled)
                 continue
 
@@ -139,41 +142,16 @@ class ElasticIndexer:
                 logging.info(f'Successfully indexed {self.chunk_size:,} ({total:,} processed) records to {self.es_index} from {file_path}')
                 count = 0
 
-        logging.info(f'Finished indexing {self.total:,} records to {self.es_index} from {file_path}')
-
-
-    async def process_file(self, file_path: str, ingest_function: callable):
-        '''
-        Read and index records in batches to Elasticsearch.
-
-        :param file_path: Path to the file
-        :param batch_size: Number of records to index per batch
-        :param ingest_function: Function to process the file
-        '''
-
-        count = 0
-
-        async for processed in ingest_function(file_path):
-            if not processed:
-                break
-
-            if self.dry_run:
-                print(processed)
-                continue
-
-            count += 1
-
-            yield {'_index': self.es_index, '_source': processed}
+        logging.info(f'Finished indexing {total:,} records to {self.es_index} from {file_path}')
 
 
-def main():
+async def main():
     '''Main function when running this script directly.'''
 
     parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
     
     # General arguments
     parser.add_argument('input_path', help='Path to the input file or directory') # Required
-    parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
     parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
     
     # Elasticsearch arguments
@@ -192,7 +170,6 @@ def main():
     
     # Performance arguments
     parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk')
-    parser.add_argument('--chunk-threads', type=int, default=3, help='Number of threads to use when indexing in chunks')
     parser.add_argument('--retries', type=int, default=60, help='Number of times to retry indexing a chunk before failing')
     parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a chunk')
 
@@ -214,6 +191,7 @@ def main():
         raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
 
     edx = ElasticIndexer(args)
+    await edx.initialize() # Initialize the Elasticsearch client asyncronously
 
     if args.cert:
         from ingestors import ingest_certs   as ingestor
@@ -225,32 +203,28 @@ def main():
         from ingestors import ingest_massdns as ingestor
     elif args.zone:
         from ingestors import ingest_zone    as ingestor
-
-    batch_size = 0
     
-    if not args.dry_run:
-        print(edx.get_cluster_health())
+    health = await edx.get_cluster_health()
+    print(health)
 
-        time.sleep(3) # Delay to allow time for sniffing to complete
-        
-        nodes = edx.get_cluster_size()
-        logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
-
-        if not edx.es_index:
-            edx.es_index = ingestor.default_index
+    await asyncio.sleep(5) # Delay to allow time for sniffing to complete
+    
+    nodes = await edx.get_cluster_size()
+    logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
 
-        map_body = ingestor.construct_map()
-        edx.create_index(map_body, args.pipeline, args.replicas, args.shards)
+    if not edx.es_index:
+        edx.es_index = ingestor.default_index
 
-        batch_size = int(nodes * (args.chunk_size * args.chunk_threads))
+    map_body = ingestor.construct_map()
+    await edx.create_index(map_body, args.pipeline, args.replicas, args.shards)
     
     if os.path.isfile(args.input_path):
         logging.info(f'Processing file: {args.input_path}')
-        edx.process_file(args.input_path, batch_size, ingestor.process_file)
+        await edx.process_data(args.input_path, ingestor.process_data)
 
     elif stat.S_ISFIFO(os.stat(args.input_path).st_mode):
         logging.info(f'Watching FIFO: {args.input_path}')
-        edx.process_file(args.input_path, batch_size, ingestor.process_file)
+        await edx.process_data(args.input_path, ingestor.process_data)
 
     elif os.path.isdir(args.input_path):
         count = 1
@@ -260,7 +234,7 @@ def main():
             file_path = os.path.join(args.input_path, file)
             if os.path.isfile(file_path):
                 logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
-                edx.process_file(file_path, batch_size, ingestor.process_file)
+                await edx.process_data(file_path, ingestor.process_data)
                 count += 1
             else:
                 logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
@@ -268,4 +242,4 @@ def main():
 
 
 if __name__ == '__main__':
-    main()
-\ No newline at end of file
+    asyncio.run(main())
+\ No newline at end of file
diff --git a/async_dev/ingestors/ingest_certs.py b/async_dev/ingestors/ingest_certs.py
@@ -91,8 +91,12 @@ def construct_map() -> dict:
     return mapping
 
 
-async def process():
-    '''Read and process Certsream records live from the Websocket stream.'''
+async def process_data(file_path: str = None):
+    '''
+    Read and process Certsream records live from the Websocket stream.
+    
+    :param file_path: Path to the Certstream log file (unused, placeholder for consistency with other ingestors)
+    '''
 
     while True:
         try:
@@ -105,6 +109,7 @@ async def process():
                         record = json.loads(line)
                     except json.decoder.JSONDecodeError:
                         logging.error(f'Failed to parse JSON record from Certstream! ({line})')
+                        input('Press Enter to continue...') # Pause the script to allow the user to read the error message
                         continue
 
                     yield record
diff --git a/async_dev/ingestors/ingest_masscan.py b/async_dev/ingestors/ingest_masscan.py
@@ -0,0 +1,148 @@
+#!/usr/bin/env python
+# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
+# ingest_masscan.py [asyncronous developement]
+
+'''
+apt-get install iptables masscan libpcap-dev screen
+setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan
+/sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP
+printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32"  > exclude.conf
+screen -S scan
+masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ output.json
+masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL
+
+Note: The above iptables rule is not persistent and will be removed on reboot.
+'''
+
+import json
+import logging
+import re
+import time
+
+try:
+    import aiofiles
+except ImportError:
+    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+
+default_index = 'masscan-logs'
+
+def construct_map() -> dict:
+    '''Construct the Elasticsearch index mapping for Masscan records.'''
+
+    keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
+
+    mapping = {
+        'mappings': {
+            'properties': {
+                'ip':      { 'type': 'ip' },
+                'port':    { 'type': 'integer' },
+                'proto':   { 'type': 'keyword' },
+                'service': { 'type': 'keyword' },
+                'banner':  keyword_mapping,
+                'ref_id':  { 'type': 'keyword' },
+                'seen':    { 'type': 'date' }
+                #'geoip':   {
+                #    'properties': {
+                #        'city_name':        keyword_mapping,
+                #        'continent_name':   keyword_mapping,
+                #        'country_iso_code': keyword_mapping,
+                #        'country_name':     keyword_mapping,
+                #        'location':         { 'type': 'geo_point' },
+                #        'region_iso_code':  keyword_mapping,
+                #        'region_name':      keyword_mapping,
+                #    }
+                #}
+            }
+        }
+    }
+
+    return mapping
+
+
+async def process_data(file_path: str):
+    '''
+    Read and process Masscan records from the log file.
+
+    :param file_path: Path to the Masscan log file
+    '''
+
+    async with aiofiles.open(file_path, mode='r') as input_file:
+        async for line in input_file:
+            line = line.strip()
+
+            if not line or not line.startswith('{'):
+                continue
+
+            if line.endswith(','):
+                line = line[:-1]
+
+            try:
+                record = json.loads(line)
+            except json.decoder.JSONDecodeError:
+                # In rare cases, the JSON record may be incomplete or malformed:
+                #   {   "ip": "51.161.12.223",   "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner":
+                #   {   "ip": "83.66.211.246",   "timestamp": "1706557002"
+                logging.error(f'Failed to parse JSON record! ({line})')
+                input('Press Enter to continue...') # Pause for review & debugging (Will remove pausing in production, still investigating the cause of this issue.)
+                continue
+
+            if len(record['ports']) > 1:
+                logging.warning(f'Multiple ports found for record! ({record})')
+                input('Press Enter to continue...') # Pause for review (Will remove pausing in production, still investigating if you ever seen more than one port in a record.)
+
+            for port_info in record['ports']:
+                struct = {
+                    'ip'    : record['ip'],
+                    'port'  : port_info['port'],
+                    'proto' : port_info['proto'],
+                    'seen'  : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
+                }
+
+                if 'service' in port_info:
+                    if 'name' in port_info['service']:
+                        if (service_name := port_info['service']['name']) not in ('unknown',''):
+                            struct['service'] = service_name
+
+                    if 'banner' in port_info['service']:
+                        banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace
+                        if banner:
+                            match = re.search(r'\(Ref\.Id: (.*?)\)', banner)
+                            if match:
+                                struct['ref_id'] = match.group(1)
+                            else:
+                                struct['banner'] = banner
+
+                yield {'_index': default_index, '_source': struct}
+ 
+    return None # EOF
+
+
+
+'''
+Example record:
+{
+    "ip": "43.134.51.142",
+    "timestamp": "1705255468", # Convert to ZULU BABY
+    "ports": [ # We will create a record for each port opened
+        {
+            "port": 22,
+            "proto": "tcp",
+            "service": { # This field is optional
+                "name": "ssh",
+                "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
+            }
+        }
+    ]
+}
+
+Will be indexed as:
+{
+    "ip": "43.134.51.142",
+    "port": 22,
+    "proto": "tcp",
+    "service": "ssh",
+    "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
+    "seen": "2021-10-08T02:04:28Z",
+    "ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # TCP RST Payload, Might be useful..
+}
+'''
+\ No newline at end of file
diff --git a/async_dev/ingestors/ingest_masscan_async.py b/async_dev/ingestors/ingest_masscan_async.py
@@ -1,141 +0,0 @@
-#!/usr/bin/env python
-# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
-# ingest_masscan.py [asyncronous developement]
-
-'''
-apt-get install iptables masscan libpcap-dev screen
-setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan
-/sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP
-printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32"  > exclude.conf
-screen -S scan
-masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ output.json
-masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL
-
-Note: The above iptables rule is not persistent and will be removed on reboot.
-'''
-
-import json
-import logging
-import re
-import time
-
-try:
-    import aiofiles
-except ImportError:
-    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
-
-default_index = 'masscan-logs'
-
-def construct_map() -> dict:
-    '''Construct the Elasticsearch index mapping for Masscan records.'''
-
-    keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
-
-    mapping = {
-        'mappings': {
-            'properties': {
-                'ip':      { 'type': 'ip' },
-                'port':    { 'type': 'integer' },
-                'proto':   { 'type': 'keyword' },
-                'service': { 'type': 'keyword' },
-                'banner':  keyword_mapping,
-                'ref_id':  { 'type': 'keyword' },
-                'seen':    { 'type': 'date' }
-                #'geoip':   {
-                #    'properties': {
-                #        'city_name':        keyword_mapping,
-                #        'continent_name':   keyword_mapping,
-                #        'country_iso_code': keyword_mapping,
-                #        'country_name':     keyword_mapping,
-                #        'location':         { 'type': 'geo_point' },
-                #        'region_iso_code':  keyword_mapping,
-                #        'region_name':      keyword_mapping,
-                #    }
-                #}
-            }
-        }
-    }
-
-    return mapping
-
-
-async def process_file(file_path: str):
-    '''
-    Read and process Masscan records from the log file.
-
-    :param file_path: Path to the Masscan log file
-    '''
-
-    async with aiofiles.open(file_path, mode='r') as input_file:
-        async for line in input_file:
-            line = line.strip()
-
-            if not line or not line.startswith('{'):
-                continue
-
-            if line.endswith(','):
-                line = line[:-1]
-
-            try:
-                record = json.loads(line)
-            except json.decoder.JSONDecodeError:
-                logging.error(f'Failed to parse JSON record! ({line})')
-                input('Press Enter to continue...') # Debugging
-                continue
-
-            for port_info in record['ports']:
-                struct = {
-                    'ip': record['ip'],
-                    'port': port_info['port'],
-                    'proto': port_info['proto'],
-                    'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
-                }
-
-                if 'service' in port_info:
-                    if 'name' in port_info['service']:
-                        if port_info['service']['name'] != 'unknown':
-                            struct['service'] = port_info['service']['name']
-
-                    if 'banner' in port_info['service']:
-                        banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace
-                        if banner:
-                            match = re.search(r'\(Ref\.Id: (.*?)\)', banner)
-                            if match:
-                                struct['ref_id'] = match.group(1)
-                            else:
-                                struct['banner'] = banner
-
-                yield {'_index': default_index, '_source': struct}
- 
-    return None # EOF
-
-
-
-'''
-Example record:
-{
-    "ip": "43.134.51.142",
-    "timestamp": "1705255468", # Convert to ZULU BABY
-    "ports": [ # We will create a record for each port opened
-        {
-            "port": 22,
-            "proto": "tcp",
-            "service": { # This field is optional
-                "name": "ssh",
-                "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
-            }
-        }
-    ]
-}
-
-Will be indexed as:
-{
-    "ip": "43.134.51.142",
-    "port": 22,
-    "proto": "tcp",
-    "service": "ssh",
-    "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
-    "seen": "2021-10-08T02:04:28Z",
-    "ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # TCP RST Payload, Might be useful..
-}
-'''
-\ No newline at end of file