eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

commit a4b89e6e5a2a539a0b065384c05545ec02a16487
parent ed547a27f4f6f0b3b5481968faf09fed0d5bfede
Author: acidvegas <acid.vegas@acid.vegas>
Date: Tue, 5 Mar 2024 22:19:11 -0500

Asyncronous refactorization pushed as main version 💯

Diffstat:
Dasync_dev/eris.py | 249-------------------------------------------------------------------------------
Dasync_dev/ingestors/ingest_httpx.py | 104-------------------------------------------------------------------------------
Dasync_dev/ingestors/ingest_masscan.py | 149-------------------------------------------------------------------------------
Dasync_dev/ingestors/ingest_massdns.py | 93-------------------------------------------------------------------------------
Dasync_dev/ingestors/ingest_zone.py | 138-------------------------------------------------------------------------------
Dasync_dev/sniff_patch.py | 97-------------------------------------------------------------------------------
Meris.py | 209++++++++++++++++++++++++++++++++-----------------------------------------------
Rasync_dev/ingestors/ingest_certs.py -> ingestors/ingest_certs.py | 0
Mingestors/ingest_httpx.py | 13+++++++++----
Mingestors/ingest_masscan.py | 36++++++++++++++++++++++++------------
Mingestors/ingest_massdns.py | 13+++++++++----
Mingestors/ingest_zone.py | 15++++++++++-----
Ceris.py -> old/eris.py | 0
Rasync_dev/ingestors/__init__.py -> old/ingestors/__init__.py | 0
Cingestors/ingest_httpx.py -> old/ingestors/ingest_httpx.py | 0
Cingestors/ingest_masscan.py -> old/ingestors/ingest_masscan.py | 0
Cingestors/ingest_massdns.py -> old/ingestors/ingest_massdns.py | 0
Cingestors/ingest_zone.py -> old/ingestors/ingest_zone.py | 0
Csniff_patch.py -> old/sniff_patch.py | 0
Msniff_patch.py | 36++++++++++++++++++------------------

20 files changed, 155 insertions(+), 997 deletions(-)

diff --git a/async_dev/eris.py b/async_dev/eris.py
@@ -1,248 +0,0 @@
-#!/usr/bin/env python
-# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
-# eris.py
-
-import asyncio
-import argparse
-import logging
-import os
-import stat
-import sys
-
-sys.dont_write_bytecode = True
-
-try:
-    # This is commented out because there is a bug with the elasticsearch library that requires a patch (see initialize() method below)
-    #from elasticsearch import AsyncElasticsearch 
-    from elasticsearch.exceptions import NotFoundError
-    from elasticsearch.helpers import async_streaming_bulk
-except ImportError:
-    raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
-
-# Setting up logging
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
-
-
-class ElasticIndexer:
-    def __init__(self, args: argparse.Namespace):
-        '''
-        Initialize the Elastic Search indexer.
-
-        :param args: Parsed arguments from argparse
-        '''
-
-        self.chunk_max  = args.chunk_max * 1024 * 1024 # MB
-        self.chunk_size = args.chunk_size
-        self.es = None
-        self.es_index = args.index
-
-        self.es_config = {
-            'hosts': [f'{args.host}:{args.port}'],
-            'verify_certs': args.self_signed,
-            'ssl_show_warn': args.self_signed,
-            'request_timeout': args.timeout,
-            'max_retries': args.retries,
-            'retry_on_timeout': True,
-            'sniff_on_start': True, # Is this problematic? 
-            'sniff_on_node_failure': True,
-            'min_delay_between_sniffing': 60 # Add config option for this?
-        }
-
-        if args.api_key:
-            self.es_config['api_key'] = (args.api_key, '') # Verify this is correct
-        else:
-            self.es_config['basic_auth'] = (args.user, args.password)
-            
-        
-    async def initialize(self):
-        '''Initialize the Elasticsearch client.'''
-
-        # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
-        import sniff_patch
-        self.es = sniff_patch.init_elasticsearch(**self.es_config)
-
-        # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
-        #self.es = AsyncElasticsearch(**es_config)
-
-
-    async def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1):
-        '''
-        Create the Elasticsearch index with the defined mapping.
-        
-        :param map_body: Mapping for the index
-        :param pipeline: Name of the ingest pipeline to use for the index
-        :param replicas: Number of replicas for the index
-        :param shards: Number of shards for the index
-        '''
-
-        if await self.es.indices.exists(index=self.es_index):
-            logging.info(f'Index \'{self.es_index}\' already exists.')
-            return
-
-        mapping = map_body
-
-        mapping['settings'] = {
-            'number_of_shards': shards,
-            'number_of_replicas': replicas
-        }
-
-        if pipeline:
-            try:
-                await self.es.ingest.get_pipeline(id=pipeline)
-                logging.info(f'Using ingest pipeline \'{pipeline}\' for index \'{self.es_index}\'')
-                mapping['settings']['index.default_pipeline'] = pipeline
-            except NotFoundError:
-                raise ValueError(f'Ingest pipeline \'{pipeline}\' does not exist.')
-
-        response = await self.es.indices.create(index=self.es_index, body=mapping)
-
-        if response.get('acknowledged') and response.get('shards_acknowledged'):
-            logging.info(f'Index \'{self.es_index}\' successfully created.')
-        else:
-            raise Exception(f'Failed to create index. ({response})')
-
-
-    async def get_cluster_health(self) -> dict:
-        '''Get the health of the Elasticsearch cluster.'''
-
-        return await self.es.cluster.health()
-    
-
-    async def get_cluster_size(self) -> int:
-        '''Get the number of nodes in the Elasticsearch cluster.'''
-
-        cluster_stats = await self.es.cluster.stats()
-        number_of_nodes = cluster_stats['nodes']['count']['total']
-
-        return number_of_nodes
-
-
-    async def process_data(self, file_path: str, data_generator: callable):
-        '''
-        Index records in chunks to Elasticsearch.
-
-        :param file_path: Path to the file
-        :param index_name: Name of the index
-        :param data_generator: Generator for the records to index
-        '''
-
-        count = 0
-        total = 0
-        
-        async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max):
-            action, result = result.popitem()
-
-            if not ok:
-                logging.error(f'Failed to index document ({result["_id"]}) to {self.es_index} from {file_path} ({result})')
-                input('Press Enter to continue...') # Debugging (will possibly remove this since we have retries enabled)
-                continue
-
-            count += 1
-            total += 1
-
-            if count == self.chunk_size:
-                logging.info(f'Successfully indexed {self.chunk_size:,} ({total:,} processed) records to {self.es_index} from {file_path}')
-                count = 0
-
-        logging.info(f'Finished indexing {total:,} records to {self.es_index} from {file_path}')
-
-
-async def main():
-    '''Main function when running this script directly.'''
-
-    parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
-    
-    # General arguments
-    parser.add_argument('input_path', help='Path to the input file or directory') # Required
-    parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
-    
-    # Elasticsearch arguments
-    parser.add_argument('--host', default='http://localhost/', help='Elasticsearch host')
-    parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
-    parser.add_argument('--user', default='elastic', help='Elasticsearch username')
-    parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
-    parser.add_argument('--api-key', default=os.getenv('ES_APIKEY'), help='Elasticsearch API Key for authentication (if not provided, check environment variable ES_APIKEY)')
-    parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
-
-    # Elasticsearch indexing arguments
-    parser.add_argument('--index', help='Elasticsearch index name')
-    parser.add_argument('--pipeline', help='Use an ingest pipeline for the index')
-    parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
-    parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
-    
-    # Performance arguments
-    parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk')
-    parser.add_argument('--chunk-max', type=int, default=100, help='Maximum size of a chunk in bytes')
-    parser.add_argument('--retries', type=int, default=100, help='Number of times to retry indexing a chunk before failing')
-    parser.add_argument('--timeout', type=int, default=60, help='Number of seconds to wait before retrying a chunk')
-
-    # Ingestion arguments
-    parser.add_argument('--cert', action='store_true', help='Index Certstream records')
-    parser.add_argument('--httpx', action='store_true', help='Index Httpx records')
-    parser.add_argument('--masscan', action='store_true', help='Index Masscan records')
-    parser.add_argument('--massdns', action='store_true', help='Index Massdns records')
-    parser.add_argument('--zone', action='store_true', help='Index Zone records')
-
-    args = parser.parse_args()
-
-    if args.watch:
-        if not os.path.exists(args.input_path):
-            os.mkfifo(args.input_path)
-        elif os.path.exists(args.input_path) and stat.S_ISFIFO(os.stat(args.input_path).st_mode):
-            raise ValueError(f'Path {args.input_path} is not a FIFO')
-    elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
-        raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
-
-    edx = ElasticIndexer(args)
-    await edx.initialize() # Initialize the Elasticsearch client asyncronously
-
-    if args.cert:
-        from ingestors import ingest_certs   as ingestor
-    if args.httpx:
-        from ingestors import ingest_httpx   as ingestor
-    elif args.masscan:
-        from ingestors import ingest_masscan as ingestor
-    elif args.massdns:
-        from ingestors import ingest_massdns as ingestor
-    elif args.zone:
-        from ingestors import ingest_zone    as ingestor
-    
-    health = await edx.get_cluster_health()
-    print(health)
-
-    await asyncio.sleep(5) # Delay to allow time for sniffing to complete
-    
-    nodes = await edx.get_cluster_size()
-    logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
-
-    if not edx.es_index:
-        edx.es_index = ingestor.default_index
-
-    map_body = ingestor.construct_map()
-    await edx.create_index(map_body, args.pipeline, args.replicas, args.shards)
-    
-    if os.path.isfile(args.input_path):
-        logging.info(f'Processing file: {args.input_path}')
-        await edx.process_data(args.input_path, ingestor.process_data)
-
-    elif stat.S_ISFIFO(os.stat(args.input_path).st_mode):
-        logging.info(f'Watching FIFO: {args.input_path}')
-        await edx.process_data(args.input_path, ingestor.process_data)
-
-    elif os.path.isdir(args.input_path):
-        count = 1
-        total = len(os.listdir(args.input_path))
-        logging.info(f'Processing {total:,} files in directory: {args.input_path}')
-        for file in sorted(os.listdir(args.input_path)):
-            file_path = os.path.join(args.input_path, file)
-            if os.path.isfile(file_path):
-                logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
-                await edx.process_data(file_path, ingestor.process_data)
-                count += 1
-            else:
-                logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
-
-
-
-if __name__ == '__main__':
-    asyncio.run(main())
-\ No newline at end of file
diff --git a/async_dev/ingestors/ingest_httpx.py b/async_dev/ingestors/ingest_httpx.py
@@ -1,103 +0,0 @@
-#!/usr/bin/env python
-# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
-# ingest_httpx.py
-
-import json
-
-try:
-    import aiofiles
-except ImportError:
-    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
-
-default_index = 'httpx-logs'
-
-def construct_map() -> dict:
-    '''Construct the Elasticsearch index mapping for Masscan records.'''
-
-    keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
-
-    mapping = {
-        'mappings': {
-            'properties': {
-                'change': 'me'
-            }
-        }
-    }
-
-    return mapping
-
-
-async def process_data(file_path: str):
-    '''
-    Read and process HTTPX records from the log file.
-
-    :param file_path: Path to the HTTPX log file
-    '''
-
-    async with aiofiles.open(file_path, mode='r') as input_file:
-        async for line in input_file:
-            line = line.strip()
-
-            if not line:
-                continue
-
-            record = json.loads(line)
-
-            record['seen'] = record.pop('timestamp').split('.')[0] + 'Z' # Hacky solution to maintain ISO 8601 format without milliseconds or offsets
-            record['domain'] = record.pop('input')
-
-            del record['failed'], record['knowledgebase'], record['time']
-
-            yield {'_index': default_index, '_source': record}
-
-    return None # EOF
-
-
-  
-''''
-Example record:
-{
-    "timestamp":"2024-01-14T13:08:15.117348474-05:00", # Rename to seen and remove milliseconds and offset
-    "hash": { # Do we need all of these ?
-        "body_md5":"4ae9394eb98233b482508cbda3b33a66",
-        "body_mmh3":"-4111954",
-        "body_sha256":"89e06e8374353469c65adb227b158b265641b424fba7ddb2c67eef0c4c1280d3",
-        "body_simhash":"9814303593401624250",
-        "header_md5":"980366deb2b2fb5df2ad861fc63e79ce",
-        "header_mmh3":"-813072798",
-        "header_sha256":"39aea75ad548e38b635421861641ad1919ed3b103b17a33c41e7ad46516f736d",
-        "header_simhash":"10962523587435277678"
-    },
-    "port":"443",
-    "url":"https://supernets.org", # Remove this and only use the input field as "domain" maybe
-    "input":"supernets.org", # rename to domain
-    "title":"SuperNETs",
-    "scheme":"https",
-    "webserver":"nginx",
-    "body_preview":"SUPERNETS Home About Contact Donate Docs Network IRC Git Invidious Jitsi LibreX Mastodon Matrix Sup",
-    "content_type":"text/html",
-    "method":"GET", # Do we need this ?
-    "host":"51.89.151.158",
-    "path":"/",
-    "favicon":"-674048714",
-    "favicon_path":"/i/favicon.png",
-    "time":"592.907689ms", # Do we need this ?
-    "a":[
-        "6.150.220.23"
-    ],
-    "tech":[
-        "Bootstrap:4.0.0",
-        "HSTS",
-        "Nginx"
-    ],
-    "words":436, # Do we need this ?
-    "lines":79, # Do we need this ?
-    "status_code":200, 
-    "content_length":4597,
-    "failed":false, # Do we need this ?
-    "knowledgebase":{ # Do we need this ?
-        "PageType":"nonerror",
-        "pHash":0
-    }
-}
-'''
-\ No newline at end of file
diff --git a/async_dev/ingestors/ingest_masscan.py b/async_dev/ingestors/ingest_masscan.py
@@ -1,148 +0,0 @@
-#!/usr/bin/env python
-# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
-# ingest_masscan.py
-
-'''
-apt-get install iptables masscan libpcap-dev screen
-setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan
-/sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP
-printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32"  > exclude.conf
-screen -S scan
-masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ output.json
-masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL
-
-Note: The above iptables rule is not persistent and will be removed on reboot.
-'''
-
-import json
-import logging
-import re
-import time
-
-try:
-    import aiofiles
-except ImportError:
-    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
-
-default_index = 'masscan-logs'
-
-def construct_map() -> dict:
-    '''Construct the Elasticsearch index mapping for Masscan records.'''
-
-    keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
-
-    mapping = {
-        'mappings': {
-            'properties': {
-                'ip':      { 'type': 'ip' },
-                'port':    { 'type': 'integer' },
-                'proto':   { 'type': 'keyword' },
-                'service': { 'type': 'keyword' },
-                'banner':  keyword_mapping,
-                'ref_id':  { 'type': 'keyword' },
-                'seen':    { 'type': 'date' }
-                #'geoip':   {
-                #    'properties': {
-                #        'city_name':        keyword_mapping,
-                #        'continent_name':   keyword_mapping,
-                #        'country_iso_code': keyword_mapping,
-                #        'country_name':     keyword_mapping,
-                #        'location':         { 'type': 'geo_point' },
-                #        'region_iso_code':  keyword_mapping,
-                #        'region_name':      keyword_mapping,
-                #    }
-                #}
-            }
-        }
-    }
-
-    return mapping
-
-
-async def process_data(file_path: str):
-    '''
-    Read and process Masscan records from the log file.
-
-    :param file_path: Path to the Masscan log file
-    '''
-
-    async with aiofiles.open(file_path, mode='r') as input_file:
-        async for line in input_file:
-            line = line.strip()
-
-            if not line or not line.startswith('{'):
-                continue
-
-            if line.endswith(','):
-                line = line[:-1]
-
-            try:
-                record = json.loads(line)
-            except json.decoder.JSONDecodeError:
-                # In rare cases, the JSON record may be incomplete or malformed:
-                #   {   "ip": "51.161.12.223",   "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner":
-                #   {   "ip": "83.66.211.246",   "timestamp": "1706557002"
-                logging.error(f'Failed to parse JSON record! ({line})')
-                input('Press Enter to continue...') # Pause for review & debugging (Will remove pausing in production, still investigating the cause of this issue.)
-                continue
-
-            if len(record['ports']) > 1:
-                logging.warning(f'Multiple ports found for record! ({record})')
-                input('Press Enter to continue...') # Pause for review (Will remove pausing in production, still investigating if you ever seen more than one port in a record.)
-
-            for port_info in record['ports']:
-                struct = {
-                    'ip'    : record['ip'],
-                    'port'  : port_info['port'],
-                    'proto' : port_info['proto'],
-                    'seen'  : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
-                }
-
-                if 'service' in port_info:
-                    if 'name' in port_info['service']:
-                        if (service_name := port_info['service']['name']) not in ('unknown',''):
-                            struct['service'] = service_name
-
-                    if 'banner' in port_info['service']:
-                        banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace
-                        if banner:
-                            match = re.search(r'\(Ref\.Id: (.*?)\)', banner)
-                            if match:
-                                struct['ref_id'] = match.group(1)
-                            else:
-                                struct['banner'] = banner
-
-                yield {'_index': default_index, '_source': struct}
- 
-    return None # EOF
-
-
-
-'''
-Example record:
-{
-    "ip": "43.134.51.142",
-    "timestamp": "1705255468", # Convert to ZULU BABY
-    "ports": [ # We will create a record for each port opened
-        {
-            "port": 22,
-            "proto": "tcp",
-            "service": { # This field is optional
-                "name": "ssh",
-                "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
-            }
-        }
-    ]
-}
-
-Will be indexed as:
-{
-    "ip": "43.134.51.142",
-    "port": 22,
-    "proto": "tcp",
-    "service": "ssh",
-    "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
-    "seen": "2021-10-08T02:04:28Z",
-    "ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # TCP RST Payload, Might be useful..
-}
-'''
-\ No newline at end of file
diff --git a/async_dev/ingestors/ingest_massdns.py b/async_dev/ingestors/ingest_massdns.py
@@ -1,92 +0,0 @@
-#!/usr/bin/env python
-# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
-# ingest_massdns.py
-
-import time
-
-try:
-    import aiofiles
-except ImportError:
-    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
-
-default_index = 'ptr-records'
-
-def construct_map() -> dict:
-    '''Construct the Elasticsearch index mapping for MassDNS records'''
-
-    keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
-
-    mapping = {
-    'mappings': {
-            'properties': {
-                'ip':     { 'type': 'ip' },
-                'name':   { 'type': 'keyword' },
-                'record': keyword_mapping,
-                'seen':   { 'type': 'date' }
-            }
-        }
-    }
-
-    return mapping
-
-
-async def process_data(file_path: str):
-    '''
-    Read and process Massdns records from the log file.
-
-    :param file_path: Path to the Massdns log file
-    '''
-
-    async with aiofiles.open(file_path, mode='r') as input_file:
-        async for line in input_file:
-            line = line.strip()
-
-            if not line:
-                continue
-
-            parts = line.split()
-
-            if len(parts) < 3:
-                raise ValueError(f'Invalid PTR record: {line}')
-            
-            name, record_type, data = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.')
-
-            if record_type != 'PTR':
-                continue
-
-                #if record_type == 'CNAME':
-                #    if data.endswith('.in-addr.arpa'):
-                #        continue
-
-            # Let's not index the PTR record if it's the same as the in-addr.arpa domain
-            if data == name:
-                continue
-                    
-            ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
-            
-            struct = {
-                'ip': ip,
-                'record': data,
-                'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
-            }
-
-            yield {'_index': default_index, '_source': struct}
-    
-    return None # EOF
-
-
-'''
-Example PTR record:
-0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
-0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com.
-0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net.
-0.6.212.173.in-addr.arpa. PTR 173-212-6-0.cpe.surry.net.
-0.6.201.133.in-addr.arpa. PTR flh2-133-201-6-0.tky.mesh.ad.jp.
-
-Will be indexed as:
-{
-    "ip": "47.229.6.0",
-    "record": "047-229-006-000.res.spectrum.com.",
-    "seen": "2021-06-30T18:31:00Z"
-}
-'''
-\ No newline at end of file
diff --git a/async_dev/ingestors/ingest_zone.py b/async_dev/ingestors/ingest_zone.py
@@ -1,137 +0,0 @@
-#!/usr/bin/env python
-# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
-# ingest_zone.py
-
-import time
-
-try:
-    import aiofiles
-except ImportError:
-    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
-
-default_index = 'dns-zones'
-record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534')
-
-def construct_map() -> dict:
-    '''Construct the Elasticsearch index mapping for zone file records.'''
-
-    keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
-
-    mapping = {
-        'mappings': {
-            'properties': {
-                'domain':  keyword_mapping,
-                'records': { 'properties': {} },
-                'seen':    {'type': 'date'}
-            }
-        }
-    }
-
-    # Add record types to mapping dynamically to not clutter the code
-    for item in record_types:
-        if item in ('a','aaaa'):
-            mapping['mappings']['properties']['records']['properties'][item] = {
-                'properties': {
-                    'data': { 'type': 'ip' },
-                    'ttl':  { 'type': 'integer' }
-                }
-            }
-        else:
-            mapping['mappings']['properties']['records']['properties'][item] = {
-                'properties': {
-                'data': keyword_mapping,
-                'ttl':  { 'type': 'integer' }
-                }
-            }
-
-    return mapping
-
-
-async def process_data(file_path: str):
-    '''
-    Read and process zone file records.
-
-    :param file_path: Path to the zone file
-    '''
-
-    domain_records = {}
-    last_domain = None
-
-    async with aiofiles.open(file_path, mode='r') as input_file:
-        async for line in input_file:
-            line = line.strip()
-
-            if not line or line.startswith(';'):
-                continue
-
-            parts = line.split()
-
-            if len(parts) < 5:
-                raise ValueError(f'Invalid line: {line}')
-
-            domain, ttl, record_class, record_type, data = parts[0].rstrip('.').lower(), parts[1], parts[2].lower(), parts[3].lower(), ' '.join(parts[4:])
-
-            if not ttl.isdigit():
-                raise ValueError(f'Invalid TTL: {ttl} with line: {line}')
-            
-            ttl = int(ttl)
-
-            if record_class != 'in':
-                raise ValueError(f'Unsupported record class: {record_class} with line: {line}') # Anomaly (Doubtful any CHAOS/HESIOD records will be found)
-
-            # We do not want to collide with our current mapping (Again, this is an anomaly)
-            if record_type not in record_types:
-                raise ValueError(f'Unsupported record type: {record_type} with line: {line}')
-
-            # Little tidying up for specific record types
-            if record_type == 'nsec':
-                data = ' '.join([data.split()[0].rstrip('.'), *data.split()[1:]])
-            elif record_type == 'soa':
-                    data = ' '.join([part.rstrip('.') if '.' in part else part for part in data.split()])
-            elif data.endswith('.'):
-                data = data.rstrip('.')
-
-            if domain != last_domain:
-                if last_domain:
-                    struct = {'domain': last_domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}
-                    
-                    del domain_records[last_domain]
-
-                    yield {'_index': default_index, '_source': struct}
-
-                last_domain = domain
-
-                domain_records[domain] = {}
-
-            if record_type not in domain_records[domain]:
-                domain_records[domain][record_type] = []
-
-            domain_records[domain][record_type].append({'ttl': ttl, 'data': data})
-
-    return None # EOF
-
-
-
-'''
-Example record:
-0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600    in  nsec3   1 1 100 332539EE7F95C32A 10MHUKG4FHIAVEFDOTF6NKU5KFCB2J3A NS DS RRSIG
-0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600    in  rrsig   NSEC3 8 2 3600 20240122151947 20240101141947 4125 vegas. hzIvQrZIxBSwRWyiHkb5M2W0R3ikNehv884nilkvTt9DaJSDzDUrCtqwQb3jh6+BesByBqfMQK+L2n9c//ZSmD5/iPqxmTPCuYIB9uBV2qSNSNXxCY7uUt5w7hKUS68SLwOSjaQ8GRME9WQJhY6gck0f8TT24enjXXRnQC8QitY=
-1-800-flowers.vegas.    3600    in  ns  dns1.cscdns.net.
-1-800-flowers.vegas.    3600    in  ns  dns2.cscdns.net.
-100.vegas.  3600    in  ns  ns51.domaincontrol.com.
-100.vegas.  3600    in  ns  ns52.domaincontrol.com.
-1001.vegas. 3600    in  ns  ns11.waterrockdigital.com.
-1001.vegas. 3600    in  ns  ns12.waterrockdigital.com.
-
-Will be indexed as:
-{
-    "domain": "1001.vegas",
-    "records": {
-        "ns": [
-            {"ttl": 3600, "data": "ns11.waterrockdigital.com"},
-            {"ttl": 3600, "data": "ns12.waterrockdigital.com"}
-        ]
-    },
-    "seen": "2021-09-01T00:00:00Z" # Zulu time added upon indexing
-}
-'''
-\ No newline at end of file
diff --git a/async_dev/sniff_patch.py b/async_dev/sniff_patch.py
@@ -1,96 +0,0 @@
-#!/usr/bin/env python
-# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
-# sniff_patch.py [asyncronous developement]
-
-# Note:
-#   This is a patch for the elasticsearch 8.x client to fix the sniff_* options.
-#   This patch is only needed if you use the sniff_* options and only works with basic auth.
-#   Call init_elasticsearch() with normal Elasticsearch params.
-#
-# Source:
-#   - https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960
-
-import base64
-
-import elasticsearch._async.client as async_client
-from elasticsearch.exceptions import SerializationError, ConnectionError
-
-
-async def init_elasticsearch_async(*args, **kwargs):
-    '''
-    Initialize the Async Elasticsearch client with the sniff patch.
-    
-    :param args: Async Elasticsearch positional arguments.
-    :param kwargs: Async Elasticsearch keyword arguments.
-    '''
-    async_client.default_sniff_callback = _override_async_sniff_callback(kwargs['basic_auth'])
-
-    return async_client.AsyncElasticsearch(*args, **kwargs)
-
-
-def _override_async_sniff_callback(basic_auth):
-    '''
-    Taken from https://github.com/elastic/elasticsearch-py/blob/8.8/elasticsearch/_sync/client/_base.py#L166
-    Completely unmodified except for adding the auth header to the elastic request.
-    Allows us to continue using the sniff_* options while this is broken in the library.
-
-    TODO: Remove this when this issue is patched:
-        - https://github.com/elastic/elasticsearch-py/issues/2005
-    '''
-    auth_str = base64.b64encode(':'.join(basic_auth).encode()).decode()
-    sniffed_node_callback = async_client._base._default_sniffed_node_callback
-
-    async def modified_async_sniff_callback(transport, sniff_options):
-        for _ in transport.node_pool.all():
-            try:
-                meta, node_infos = await transport.perform_request(
-                    'GET',
-                    '/_nodes/_all/http',
-                    headers={
-                        'accept': 'application/vnd.elasticsearch+json; compatible-with=8',
-                        'authorization': f'Basic {auth_str}'  # This auth header is missing in 8.x releases of the client, and causes 401s
-                    },
-                    request_timeout=(
-                        sniff_options.sniff_timeout
-                        if not sniff_options.is_initial_sniff
-                        else None
-                    ),
-                )
-            except (SerializationError, ConnectionError):
-                continue
-
-            if not 200 <= meta.status <= 299:
-                continue
-
-            node_configs = []
-            for node_info in node_infos.get('nodes', {}).values():
-                address = node_info.get('http', {}).get('publish_address')
-                if not address or ':' not in address:
-                    continue
-
-                if '/' in address:
-                    # Support 7.x host/ip:port behavior where http.publish_host has been set.
-                    fqdn, ipaddress = address.split('/', 1)
-                    host = fqdn
-                    _, port_str = ipaddress.rsplit(':', 1)
-                    port = int(port_str)
-                else:
-                    host, port_str = address.rsplit(':', 1)
-                    port = int(port_str)
-
-                assert sniffed_node_callback is not None
-                sniffed_node = await sniffed_node_callback(
-                    node_info, meta.node.replace(host=host, port=port)
-                )
-                if sniffed_node is None:
-                    continue
-
-                # Use the node which was able to make the request as a base.
-                node_configs.append(sniffed_node)
-
-            if node_configs:
-                return node_configs
-
-        return []
-
-    return modified_async_sniff_callback
-\ No newline at end of file
diff --git a/eris.py b/eris.py
@@ -1,18 +1,21 @@
 #!/usr/bin/env python
 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
+# eris.py
 
+import asyncio
 import argparse
 import logging
 import os
 import stat
-import time
 import sys
 
 sys.dont_write_bytecode = True
 
 try:
-    from elasticsearch import Elasticsearch, helpers
+    # This is commented out because there is a bug with the elasticsearch library that requires a patch (see initialize() method below)
+    #from elasticsearch import AsyncElasticsearch 
     from elasticsearch.exceptions import NotFoundError
+    from elasticsearch.helpers import async_streaming_bulk
 except ImportError:
     raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
 
@@ -28,48 +31,51 @@ class ElasticIndexer:
         :param args: Parsed arguments from argparse
         '''
 
-        self.chunk_max = args.chunk_max * 1024 * 1024 # MB
+        self.chunk_max  = args.chunk_max * 1024 * 1024 # MB
         self.chunk_size = args.chunk_size
-        self.chunk_threads = args.chunk_threads
-        self.dry_run = args.dry_run
+        self.es = None
         self.es_index = args.index
 
-        if not args.dry_run:
-            es_config = {
-                'hosts': [f'{args.host}:{args.port}'],
-                'verify_certs': args.self_signed,
-                'ssl_show_warn': args.self_signed,
-                'request_timeout': args.timeout,
-                'max_retries': args.retries,
-                'retry_on_timeout': True,
-                'sniff_on_start': False,
-                'sniff_on_node_failure': True,
-                'min_delay_between_sniffing': 60 # Add config option for this?
-            }
-
-            if args.api_key:
-                es_config['headers'] = {'Authorization': f'ApiKey {args.api_key}'}
-            else:
-                es_config['basic_auth'] = (args.user, args.password)
-                
-            # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
-            import sniff_patch
-            self.es = sniff_patch.init_elasticsearch(**es_config)
+        self.es_config = {
+            'hosts': [f'{args.host}:{args.port}'],
+            'verify_certs': args.self_signed,
+            'ssl_show_warn': args.self_signed,
+            'request_timeout': args.timeout,
+            'max_retries': args.retries,
+            'retry_on_timeout': True,
+            'sniff_on_start': True, # Is this problematic? 
+            'sniff_on_node_failure': True,
+            'min_delay_between_sniffing': 60 # Add config option for this?
+        }
+
+        if args.api_key:
+            self.es_config['api_key'] = (args.api_key, '') # Verify this is correct
+        else:
+            self.es_config['basic_auth'] = (args.user, args.password)
+            
+        
+    async def initialize(self):
+        '''Initialize the Elasticsearch client.'''
 
-            # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
-            #self.es = Elasticsearch(**es_config)
+        # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
+        import sniff_patch
+        self.es = sniff_patch.init_elasticsearch(**self.es_config)
 
+        # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
+        #self.es = AsyncElasticsearch(**es_config)
 
-    def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1, ):
+
+    async def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1):
         '''
         Create the Elasticsearch index with the defined mapping.
         
-        :param pipline: Name of the ingest pipeline to use for the index
+        :param map_body: Mapping for the index
+        :param pipeline: Name of the ingest pipeline to use for the index
         :param replicas: Number of replicas for the index
         :param shards: Number of shards for the index
         '''
 
-        if self.es.indices.exists(index=self.es_index):
+        if await self.es.indices.exists(index=self.es_index):
             logging.info(f'Index \'{self.es_index}\' already exists.')
             return
 
@@ -82,13 +88,13 @@ class ElasticIndexer:
 
         if pipeline:
             try:
-                self.es.ingest.get_pipeline(id=pipeline)
+                await self.es.ingest.get_pipeline(id=pipeline)
                 logging.info(f'Using ingest pipeline \'{pipeline}\' for index \'{self.es_index}\'')
                 mapping['settings']['index.default_pipeline'] = pipeline
             except NotFoundError:
                 raise ValueError(f'Ingest pipeline \'{pipeline}\' does not exist.')
 
-        response = self.es.indices.create(index=self.es_index, body=mapping)
+        response = await self.es.indices.create(index=self.es_index, body=mapping)
 
         if response.get('acknowledged') and response.get('shards_acknowledged'):
             logging.info(f'Index \'{self.es_index}\' successfully created.')
@@ -96,106 +102,62 @@ class ElasticIndexer:
             raise Exception(f'Failed to create index. ({response})')
 
 
-    def get_cluster_health(self) -> dict:
+    async def get_cluster_health(self) -> dict:
         '''Get the health of the Elasticsearch cluster.'''
 
-        return self.es.cluster.health()
+        return await self.es.cluster.health()
     
 
-    def get_cluster_size(self) -> int:
+    async def get_cluster_size(self) -> int:
         '''Get the number of nodes in the Elasticsearch cluster.'''
 
-        cluster_stats = self.es.cluster.stats()
+        cluster_stats = await self.es.cluster.stats()
         number_of_nodes = cluster_stats['nodes']['count']['total']
 
         return number_of_nodes
 
 
-    def bulk_index(self, documents: list, file_path: str, count: int):
+    async def process_data(self, file_path: str, data_generator: callable):
         '''
-        Index a batch of documents to Elasticsearch.
-        
-        :param documents: List of documents to index
-        :param file_path: Path to the file being indexed
-        :param count: Total number of records processed
-        '''
-
-        remaining_documents = documents
-
-        parallel_bulk_config = {
-            'client': self.es,
-            'chunk_size': self.chunk_size,
-            'max_chunk_bytes': self.chunk_max,
-            'thread_count': self.chunk_threads,
-            'queue_size': 2 # Add config option for this?
-        }
-
-        while remaining_documents:
-            failed_documents = []
-
-            try:
-                for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config):
-                    if not success:
-                        failed_documents.append(response)
-
-                if not failed_documents:
-                    ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count']
-                    logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}')
-                    break
-
-                else:
-                    logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...')
-                    remaining_documents = failed_documents
-            except Exception as e:
-                logging.error(f'Failed to index documents! ({e})')
-                time.sleep(30) # Should we add a config option for this?
-
-
-    def process_file(self, file_path: str, batch_size: int, ingest_function: callable):
-        '''
-        Read and index records in batches to Elasticsearch.
+        Index records in chunks to Elasticsearch.
 
         :param file_path: Path to the file
-        :param batch_size: Number of records to index per batch
-        :param ingest_function: Function to process the file
+        :param index_name: Name of the index
+        :param data_generator: Generator for the records to index
         '''
 
         count = 0
-        records = []
-
-        for processed in ingest_function(file_path):
-
-            if not processed:
-                break
+        total = 0
+        
+        async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max):
+            action, result = result.popitem()
 
-            if self.dry_run:
-                print(processed)
+            if not ok:
+                logging.error(f'Failed to index document ({result["_id"]}) to {self.es_index} from {file_path} ({result})')
+                input('Press Enter to continue...') # Debugging (will possibly remove this since we have retries enabled)
                 continue
 
-            struct = {'_index': self.es_index, '_source': processed}
-            records.append(struct)
             count += 1
-            
-            if len(records) >= batch_size:
-                self.bulk_index(records, file_path, count)
-                records = []
+            total += 1
 
-        if records:
-            self.bulk_index(records, file_path, count)
+            if count == self.chunk_size:
+                logging.info(f'Successfully indexed {self.chunk_size:,} ({total:,} processed) records to {self.es_index} from {file_path}')
+                count = 0
 
+        logging.info(f'Finished indexing {total:,} records to {self.es_index} from {file_path}')
 
-def main():
+
+async def main():
     '''Main function when running this script directly.'''
 
     parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
     
     # General arguments
     parser.add_argument('input_path', help='Path to the input file or directory') # Required
-    parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
     parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
     
     # Elasticsearch arguments
-    parser.add_argument('--host', default='localhost', help='Elasticsearch host')
+    parser.add_argument('--host', default='http://localhost/', help='Elasticsearch host')
     parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
     parser.add_argument('--user', default='elastic', help='Elasticsearch username')
     parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
@@ -206,16 +168,16 @@ def main():
     parser.add_argument('--index', help='Elasticsearch index name')
     parser.add_argument('--pipeline', help='Use an ingest pipeline for the index')
     parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
-    parser.add_argument('--shards', type=int, default=3, help='Number of shards for the index')
+    parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
     
     # Performance arguments
-    parser.add_argument('--chunk-max', type=int, default=10, help='Maximum size in MB of a chunk')
     parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk')
-    parser.add_argument('--chunk-threads', type=int, default=3, help='Number of threads to use when indexing in chunks')
-    parser.add_argument('--retries', type=int, default=60, help='Number of times to retry indexing a chunk before failing')
-    parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a chunk')
+    parser.add_argument('--chunk-max', type=int, default=100, help='Maximum size of a chunk in bytes')
+    parser.add_argument('--retries', type=int, default=100, help='Number of times to retry indexing a chunk before failing')
+    parser.add_argument('--timeout', type=int, default=60, help='Number of seconds to wait before retrying a chunk')
 
     # Ingestion arguments
+    parser.add_argument('--cert', action='store_true', help='Index Certstream records')
     parser.add_argument('--httpx', action='store_true', help='Index Httpx records')
     parser.add_argument('--masscan', action='store_true', help='Index Masscan records')
     parser.add_argument('--massdns', action='store_true', help='Index Massdns records')
@@ -232,7 +194,10 @@ def main():
         raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
 
     edx = ElasticIndexer(args)
+    await edx.initialize() # Initialize the Elasticsearch client asyncronously
 
+    if args.cert:
+        from ingestors import ingest_certs   as ingestor
     if args.httpx:
         from ingestors import ingest_httpx   as ingestor
     elif args.masscan:
@@ -241,32 +206,28 @@ def main():
         from ingestors import ingest_massdns as ingestor
     elif args.zone:
         from ingestors import ingest_zone    as ingestor
-
-    batch_size = 0
     
-    if not args.dry_run:
-        print(edx.get_cluster_health())
+    health = await edx.get_cluster_health()
+    print(health)
 
-        time.sleep(3) # Delay to allow time for sniffing to complete
-        
-        nodes = edx.get_cluster_size()
-        logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
-
-        if not edx.es_index:
-            edx.es_index = ingestor.default_index
+    await asyncio.sleep(5) # Delay to allow time for sniffing to complete
+    
+    nodes = await edx.get_cluster_size()
+    logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
 
-        map_body = ingestor.construct_map()
-        edx.create_index(map_body, args.pipeline, args.replicas, args.shards)
+    if not edx.es_index:
+        edx.es_index = ingestor.default_index
 
-        batch_size = int(nodes * (args.chunk_size * args.chunk_threads))
+    map_body = ingestor.construct_map()
+    await edx.create_index(map_body, args.pipeline, args.replicas, args.shards)
     
     if os.path.isfile(args.input_path):
         logging.info(f'Processing file: {args.input_path}')
-        edx.process_file(args.input_path, batch_size, ingestor.process_file)
+        await edx.process_data(args.input_path, ingestor.process_data)
 
     elif stat.S_ISFIFO(os.stat(args.input_path).st_mode):
         logging.info(f'Watching FIFO: {args.input_path}')
-        edx.process_file(args.input_path, batch_size, ingestor.process_file)
+        await edx.process_data(args.input_path, ingestor.process_data)
 
     elif os.path.isdir(args.input_path):
         count = 1
@@ -276,7 +237,7 @@ def main():
             file_path = os.path.join(args.input_path, file)
             if os.path.isfile(file_path):
                 logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
-                edx.process_file(file_path, batch_size, ingestor.process_file)
+                await edx.process_data(file_path, ingestor.process_data)
                 count += 1
             else:
                 logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
@@ -284,4 +245,4 @@ def main():
 
 
 if __name__ == '__main__':
-    main()
-\ No newline at end of file
+    asyncio.run(main())
+\ No newline at end of file
diff --git a/async_dev/ingestors/ingest_certs.py b/ingestors/ingest_certs.py
diff --git a/ingestors/ingest_httpx.py b/ingestors/ingest_httpx.py
@@ -4,6 +4,11 @@
 
 import json
 
+try:
+    import aiofiles
+except ImportError:
+    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+
 default_index = 'httpx-logs'
 
 def construct_map() -> dict:
@@ -22,15 +27,15 @@ def construct_map() -> dict:
     return mapping
 
 
-def process_file(file_path: str):
+async def process_data(file_path: str):
     '''
     Read and process HTTPX records from the log file.
 
     :param file_path: Path to the HTTPX log file
     '''
 
-    with open(file_path, 'r') as file:
-        for line in file:
+    async with aiofiles.open(file_path, mode='r') as input_file:
+        async for line in input_file:
             line = line.strip()
 
             if not line:
@@ -43,7 +48,7 @@ def process_file(file_path: str):
 
             del record['failed'], record['knowledgebase'], record['time']
 
-            yield record
+            yield {'_index': default_index, '_source': record}
 
     return None # EOF
 
diff --git a/ingestors/ingest_masscan.py b/ingestors/ingest_masscan.py
@@ -19,6 +19,11 @@ import logging
 import re
 import time
 
+try:
+    import aiofiles
+except ImportError:
+    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+
 default_index = 'masscan-logs'
 
 def construct_map() -> dict:
@@ -54,15 +59,15 @@ def construct_map() -> dict:
     return mapping
 
 
-def process_file(file_path: str):
+async def process_data(file_path: str):
     '''
     Read and process Masscan records from the log file.
 
     :param file_path: Path to the Masscan log file
     '''
 
-    with open(file_path, 'r') as file:
-        for line in file:
+    async with aiofiles.open(file_path, mode='r') as input_file:
+        async for line in input_file:
             line = line.strip()
 
             if not line or not line.startswith('{'):
@@ -74,22 +79,29 @@ def process_file(file_path: str):
             try:
                 record = json.loads(line)
             except json.decoder.JSONDecodeError:
+                # In rare cases, the JSON record may be incomplete or malformed:
+                #   {   "ip": "51.161.12.223",   "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner":
+                #   {   "ip": "83.66.211.246",   "timestamp": "1706557002"
                 logging.error(f'Failed to parse JSON record! ({line})')
-                input('Press Enter to continue...') # Debugging
+                input('Press Enter to continue...') # Pause for review & debugging (Will remove pausing in production, still investigating the cause of this issue.)
                 continue
 
+            if len(record['ports']) > 1:
+                logging.warning(f'Multiple ports found for record! ({record})')
+                input('Press Enter to continue...') # Pause for review (Will remove pausing in production, still investigating if you ever seen more than one port in a record.)
+
             for port_info in record['ports']:
                 struct = {
-                    'ip': record['ip'],
-                    'port': port_info['port'],
-                    'proto': port_info['proto'],
-                    'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
+                    'ip'    : record['ip'],
+                    'port'  : port_info['port'],
+                    'proto' : port_info['proto'],
+                    'seen'  : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
                 }
 
                 if 'service' in port_info:
                     if 'name' in port_info['service']:
-                        if port_info['service']['name'] != 'unknown':
-                            struct['service'] = port_info['service']['name']
+                        if (service_name := port_info['service']['name']) not in ('unknown',''):
+                            struct['service'] = service_name
 
                     if 'banner' in port_info['service']:
                         banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace
@@ -100,7 +112,7 @@ def process_file(file_path: str):
                             else:
                                 struct['banner'] = banner
 
-                yield struct
+                yield {'_index': default_index, '_source': struct}
  
     return None # EOF
 
@@ -131,6 +143,6 @@ Will be indexed as:
     "service": "ssh",
     "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
     "seen": "2021-10-08T02:04:28Z",
-    "ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # TCP RST Payload (Do we need this?)
+    "ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # TCP RST Payload, Might be useful..
 }
 '''
 \ No newline at end of file
diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py
@@ -4,6 +4,11 @@
 
 import time
 
+try:
+    import aiofiles
+except ImportError:
+    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+
 default_index = 'ptr-records'
 
 def construct_map() -> dict:
@@ -25,15 +30,15 @@ def construct_map() -> dict:
     return mapping
 
 
-def process_file(file_path: str):
+async def process_data(file_path: str):
     '''
     Read and process Massdns records from the log file.
 
     :param file_path: Path to the Massdns log file
     '''
 
-    with open(file_path, 'r') as file:
-        for line in file:
+    async with aiofiles.open(file_path, mode='r') as input_file:
+        async for line in input_file:
             line = line.strip()
 
             if not line:
@@ -65,7 +70,7 @@ def process_file(file_path: str):
                 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
             }
 
-            yield struct
+            yield {'_index': default_index, '_source': struct}
     
     return None # EOF
 
diff --git a/ingestors/ingest_zone.py b/ingestors/ingest_zone.py
@@ -4,6 +4,11 @@
 
 import time
 
+try:
+    import aiofiles
+except ImportError:
+    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+
 default_index = 'dns-zones'
 record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534')
 
@@ -42,7 +47,7 @@ def construct_map() -> dict:
     return mapping
 
 
-def process_file(file_path: str):
+async def process_data(file_path: str):
     '''
     Read and process zone file records.
 
@@ -52,8 +57,8 @@ def process_file(file_path: str):
     domain_records = {}
     last_domain = None
 
-    with open(file_path, 'r') as file:
-        for line in file:
+    async with aiofiles.open(file_path, mode='r') as input_file:
+        async for line in input_file:
             line = line.strip()
 
             if not line or line.startswith(';'):
@@ -88,11 +93,11 @@ def process_file(file_path: str):
 
             if domain != last_domain:
                 if last_domain:
-                    source = {'domain': last_domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}
+                    struct = {'domain': last_domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}
                     
                     del domain_records[last_domain]
 
-                    yield source
+                    yield {'_index': default_index, '_source': struct}
 
                 last_domain = domain
 
diff --git a/eris.py b/old/eris.py
diff --git a/async_dev/ingestors/__init__.py b/old/ingestors/__init__.py
diff --git a/ingestors/ingest_httpx.py b/old/ingestors/ingest_httpx.py
diff --git a/ingestors/ingest_masscan.py b/old/ingestors/ingest_masscan.py
diff --git a/ingestors/ingest_massdns.py b/old/ingestors/ingest_massdns.py
diff --git a/ingestors/ingest_zone.py b/old/ingestors/ingest_zone.py
diff --git a/sniff_patch.py b/old/sniff_patch.py
diff --git a/sniff_patch.py b/sniff_patch.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
-# sniff_patch.py
+# sniff_patch.py [asyncronous developement]
 
 # Note:
 #   This is a patch for the elasticsearch 8.x client to fix the sniff_* options.
@@ -12,23 +12,23 @@
 
 import base64
 
-import elasticsearch._sync.client as client
+import elasticsearch._async.client as async_client
 from elasticsearch.exceptions import SerializationError, ConnectionError
 
 
-def init_elasticsearch(*args, **kwargs):
+async def init_elasticsearch_async(*args, **kwargs):
     '''
-    Initialize the Elasticsearch client with the sniff patch.
+    Initialize the Async Elasticsearch client with the sniff patch.
     
-    :param args: Elasticsearch positional arguments.
-    :param kwargs: Elasticsearch keyword arguments.
+    :param args: Async Elasticsearch positional arguments.
+    :param kwargs: Async Elasticsearch keyword arguments.
     '''
-    client.default_sniff_callback = _override_sniff_callback(kwargs['basic_auth'])
+    async_client.default_sniff_callback = _override_async_sniff_callback(kwargs['basic_auth'])
 
-    return client.Elasticsearch(*args, **kwargs)
+    return async_client.AsyncElasticsearch(*args, **kwargs)
 
 
-def _override_sniff_callback(basic_auth):
+def _override_async_sniff_callback(basic_auth):
     '''
     Taken from https://github.com/elastic/elasticsearch-py/blob/8.8/elasticsearch/_sync/client/_base.py#L166
     Completely unmodified except for adding the auth header to the elastic request.
@@ -38,19 +38,19 @@ def _override_sniff_callback(basic_auth):
         - https://github.com/elastic/elasticsearch-py/issues/2005
     '''
     auth_str = base64.b64encode(':'.join(basic_auth).encode()).decode()
-    sniffed_node_callback = client._base._default_sniffed_node_callback
+    sniffed_node_callback = async_client._base._default_sniffed_node_callback
 
-    def modified_sniff_callback(transport, sniff_options):
+    async def modified_async_sniff_callback(transport, sniff_options):
         for _ in transport.node_pool.all():
             try:
-                meta, node_infos = transport.perform_request(
+                meta, node_infos = await transport.perform_request(
                     'GET',
                     '/_nodes/_all/http',
-                    headers = {
+                    headers={
                         'accept': 'application/vnd.elasticsearch+json; compatible-with=8',
-                        'authorization': f'Basic {auth_str}' # This auth header is missing in 8.x releases of the client, and causes 401s
+                        'authorization': f'Basic {auth_str}'  # This auth header is missing in 8.x releases of the client, and causes 401s
                     },
-                    request_timeout = (
+                    request_timeout=(
                         sniff_options.sniff_timeout
                         if not sniff_options.is_initial_sniff
                         else None
@@ -79,7 +79,7 @@ def _override_sniff_callback(basic_auth):
                     port = int(port_str)
 
                 assert sniffed_node_callback is not None
-                sniffed_node = sniffed_node_callback(
+                sniffed_node = await sniffed_node_callback(
                     node_info, meta.node.replace(host=host, port=port)
                 )
                 if sniffed_node is None:
@@ -93,4 +93,4 @@ def _override_sniff_callback(basic_auth):
 
         return []
 
-    return modified_sniff_callback
-\ No newline at end of file
+    return modified_async_sniff_callback
+\ No newline at end of file