eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

commit 87f2cf27ea03d88060e2b7ee58b4efcc2f0d074b
parent b018da4e4dbf9614d787e20ef59061bc703e8b44
Author: acidvegas <acid.vegas@acid.vegas>
Date: Mon, 11 Mar 2024 22:33:18 -0400

Code cleanup

Diffstat:
Mingestors/ingest_certs.py | 329++++++++++++++++++++++++++++++-------------------------------------------------
Mingestors/ingest_massdns.py | 250++++++++++++++++++++++++++++++++++++++++----------------------------------------
Mingestors/ingest_zone.py | 286+++++++++++++++++++++++++++++++++++++++++--------------------------------------

3 files changed, 397 insertions(+), 468 deletions(-)

diff --git a/ingestors/ingest_certs.py b/ingestors/ingest_certs.py
@@ -5,231 +5,151 @@
 import asyncio
 import json
 import logging
+import time
 
 try:
-    import websockets
+	import websockets
 except ImportError:
-    raise ImportError('Missing required \'websockets\' library. (pip install websockets)')
+	raise ImportError('Missing required \'websockets\' library. (pip install websockets)')
 
+
+# Set a default elasticsearch index if one is not provided
 default_index = 'cert-stream'
 
+
 def construct_map() -> dict:
-    '''Construct the Elasticsearch index mapping for Certstream records.'''
-
-    keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
-
-    mapping = {
-        'mappings': {
-            'properties': {
-                'data': {
-                    'properties': {
-                        'cert_index': { 'type': 'integer' },
-                        'cert_link' : { 'type': 'keyword' },
-                        'leaf_cert' : {
-                            'properties': {
-                                'all_domains': { 'type': 'keyword' },
-                                'extensions': {
-                                    'properties': {
-                                        'authorityInfoAccess'    : { 'type': 'text'    },
-                                        'authorityKeyIdentifier' : { 'type': 'text'    },
-                                        'basicConstraints'       : { 'type': 'text'    },
-                                        'certificatePolicies'    : { 'type': 'text'    },
-                                        'crlDistributionPoints'  : { 'type': 'text'    },
-                                        'ctlPoisonByte'          : { 'type': 'boolean' },
-                                        'extendedKeyUsage'       : { 'type': 'text'    },
-                                        'keyUsage'               : { 'type': 'text'    },
-                                        'subjectAltName'         : { 'type': 'text'    },
-                                        'subjectKeyIdentifier'   : { 'type': 'text'    }
-                                    }
-                                },
-                                'fingerprint': { 'type': 'keyword' },
-                                'issuer': {
-                                    'properties': {
-                                        'C'            : { 'type': 'keyword' },
-                                        'CN'           : { 'type': 'text'    },
-                                        'L'            : { 'type': 'text'    },
-                                        'O'            : { 'type': 'text'    },
-                                        'OU'           : { 'type': 'text'    },
-                                        'ST'           : { 'type': 'text'    },
-                                        'aggregated'   : { 'type': 'text'    },
-                                        'emailAddress' : { 'type': 'text'    }
-                                    }
-                                },
-                                'not_after'           : { 'type': 'integer' },
-                                'not_before'          : { 'type': 'integer' },
-                                'serial_number'       : { 'type': 'keyword' },
-                                'signature_algorithm' : { 'type': 'text' },
-                                'subject': {
-                                    'properties': {
-                                        'C'            : { 'type': 'keyword' },
-                                        'CN'           : { 'type': 'text'    },
-                                        'L'            : { 'type': 'text'    },
-                                        'O'            : { 'type': 'text'    },
-                                        'OU'           : { 'type': 'text'    },
-                                        'ST'           : { 'type': 'text'    },
-                                        'aggregated'   : { 'type': 'text'    },
-                                        'emailAddress' : { 'type': 'text'    }
-                                    }
-                                }
-                            }
-                        },
-                        'seen': { 'type': 'date', 'format': 'epoch_second' },
-                        'source': {
-                            'properties': {
-                                'name' : { 'type': 'keyword' },
-                                'url'  : { 'type': 'keyword' }
-                            }
-                        },
-                        'update_type': { 'type': 'keyword' }
-                    }
-                },
-                'message_type': { 'type': 'keyword' }
-            }
-        }
-    }
-
-    return mapping
+	'''Construct the Elasticsearch index mapping for Certstream records.'''
+
+	# Match on exact value or full text search
+	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
+
+	# Construct the index mapping
+	mapping = {
+		'mappings': {
+			'properties' : {
+				'domain' : keyword_mapping,
+				'seen'   : { 'type': 'date' }
+			}
+		}
+	}
+
+	return mapping
 
 
 async def process_data(place_holder: str = None):
-    '''
-    Read and process Certsream records live from the Websocket stream.
-    
-    :param place_holder: Placeholder parameter to match the process_data function signature of other ingestors.
-    '''
-
-    while True:
-        try:
-            async with websockets.connect('wss://certstream.calidog.io/') as websocket:
-                while True:
-                    line = await websocket.recv()
-
-                    if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO)
-                        break
-
-                    try:
-                        record = json.loads(line)
-                    except json.decoder.JSONDecodeError:
-                        logging.error(f'Failed to parse JSON record from Certstream! ({line})')
-                        input('Press Enter to continue...')
-                        continue
-
-                    yield record
-
-        except websockets.ConnectionClosed:
-            logging.error('Connection to Certstream was closed. Attempting to reconnect...')
-            await asyncio.sleep(15)
-
-        except Exception as e:
-            logging.error(f'An error occurred while processing Certstream records! ({e})')
-            await asyncio.sleep(15)
-
-
-async def strip_struct_empty(data: dict) -> dict:
-    '''
-    Recursively remove empty values from a nested dictionary or list.
-    
-    :param data: The dictionary or list to clean.
-    '''
-
-    empties = [None, '', [], {}]
-
-    if isinstance(data, dict):
-        for key, value in list(data.items()):
-            if value in empties:
-                del data[key]
-            else:
-                cleaned_value = strip_struct_empty(value)
-                if cleaned_value in empties:
-                    del data[key]
-                else:
-                    data[key] = cleaned_value
-
-        return data
-    
-    elif isinstance(data, list):
-        return [strip_struct_empty(item) for item in data if item not in empties and strip_struct_empty(item) not in empties]
-
-    else:
-        return data
+	'''
+	Read and process Certsream records live from the Websocket stream.
+
+	:param place_holder: Placeholder parameter to match the process_data function signature of other ingestors.
+	'''
+
+	while True:
+		try:
+			async with websockets.connect('wss://certstream.calidog.io') as websocket:
+				while True:
+					# Read a line from the websocket
+					line = await websocket.recv()
+
+					# Parse the JSON record
+					try:
+						record = json.loads(line)
+					except json.decoder.JSONDecodeError:
+						logging.error(f'Invalid line from the websocket: {line}')
+						continue
+
+					# Grab the unique domains from the record (excluding wildcards)
+					domains = record['data']['leaf_cert']['all_domains']
+					domains = set([domain[2:] if domain.startswith('*.') else domain for domain in domains])
+
+					# Construct the document
+					for domain in domains:
+						struct = {
+							'domain' : domain,
+							'seen'   : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
+						}
+
+						yield {'_id': id, '_index': default_index, '_source': struct}
+
+		except websockets.ConnectionClosed:
+			logging.error('Connection to Certstream was closed. Attempting to reconnect...')
+			await asyncio.sleep(15)
+
+		except Exception as e:
+			logging.error(f'An error occurred while processing Certstream records! ({e})')
+			break
 
 
 async def test():
-    '''Test the Cert stream ingestion process'''
+	'''Test the ingestion process.'''
 
-    async for document in process_data():
-        print(document)
+	async for document in process_data():
+		print(document)
 
 
 
 if __name__ == '__main__':
-    import argparse
-    import asyncio
+	import asyncio
 
-    parser = argparse.ArgumentParser(description='Certstream Ingestor for ERIS')
-    parser.add_argument('input_path', help='Path to the input file or directory')
-    args = parser.parse_args()
-    
-    asyncio.run(test(args.input_path))
+	asyncio.run(test())
 
 
 
 '''
 Output:
-    {
-        "data": {
-            "cert_index": 43061646,
-            "cert_link": "https://yeti2025.ct.digicert.com/log/ct/v1/get-entries?start=43061646&end=43061646",
-            "leaf_cert": {
-                "all_domains": [
-                    "*.d7zdnegbre53n.amplifyapp.com",
-                    "d7zdnegbre53n.amplifyapp.com"
-                ],
-                "extensions": {
-                    "authorityInfoAccess"    : "CA Issuers - URI:http://crt.r2m02.amazontrust.com/r2m02.cer\nOCSP - URI:http://ocsp.r2m02.amazontrust.com\n",
-                    "authorityKeyIdentifier" : "keyid:C0:31:52:CD:5A:50:C3:82:7C:74:71:CE:CB:E9:9C:F9:7A:EB:82:E2\n",
-                    "basicConstraints"       : "CA:FALSE",
-                    "certificatePolicies"    : "Policy: 2.23.140.1.2.1",
-                    "crlDistributionPoints"  : "Full Name:\n URI:http://crl.r2m02.amazontrust.com/r2m02.crl",
-                    "ctlPoisonByte"          : true,
-                    "extendedKeyUsage"       : "TLS Web server authentication, TLS Web client authentication",
-                    "keyUsage"               : "Digital Signature, Key Encipherment",
-                    "subjectAltName"         : "DNS:d7zdnegbre53n.amplifyapp.com, DNS:*.d7zdnegbre53n.amplifyapp.com",
-                    "subjectKeyIdentifier"   : "59:32:78:2A:11:03:62:55:BB:3B:B9:80:24:76:28:90:2E:D1:A4:56"
-                },
-                "fingerprint": "D9:05:A3:D5:AA:F9:68:BC:0C:0A:15:69:C9:5E:11:92:32:67:4F:FA",
-                "issuer": {
-                    "C"            : "US",
-                    "CN"           : "Amazon RSA 2048 M02",
-                    "L"            : null,
-                    "O"            : "Amazon",
-                    "OU"           : null,
-                    "ST"           : null,
-                    "aggregated"   : "/C=US/CN=Amazon RSA 2048 M02/O=Amazon",
-                    "emailAddress" : null
-                },
-                "not_after"           : 1743811199,
-                "not_before"          : 1709596800,
-                "serial_number"       : "FDB450C1942E3D30A18737063449E62",
-                "signature_algorithm" : "sha256, rsa",
-                "subject": {
-                    "C"            : null,
-                    "CN"           : "*.d7zdnegbre53n.amplifyapp.com",
-                    "L"            : null,
-                    "O"            : null,
-                    "OU"           : null,
-                    "ST"           : null,
-                    "aggregated"   : "/CN=*.d7zdnegbre53n.amplifyapp.com",
-                    "emailAddress" : null
-                }
-            },
-            "seen": 1709651773.594684,
-            "source": {
-                "name" : "DigiCert Yeti2025 Log",
-                "url"  : "https://yeti2025.ct.digicert.com/log/"
-            },
-            "update_type": "PrecertLogEntry"
-        },
-        "message_type": "certificate_update"
-    }
-'''
-\ No newline at end of file
+	{
+		"data": {
+			"cert_index": 43061646,
+			"cert_link": "https://yeti2025.ct.digicert.com/log/ct/v1/get-entries?start=43061646&end=43061646",
+			"leaf_cert": {
+				"all_domains": [
+					"*.d7zdnegbre53n.amplifyapp.com",
+					"d7zdnegbre53n.amplifyapp.com"
+				],
+				"extensions": {
+					"authorityInfoAccess"    : "CA Issuers - URI:http://crt.r2m02.amazontrust.com/r2m02.cer\nOCSP - URI:http://ocsp.r2m02.amazontrust.com\n",
+					"authorityKeyIdentifier" : "keyid:C0:31:52:CD:5A:50:C3:82:7C:74:71:CE:CB:E9:9C:F9:7A:EB:82:E2\n",
+					"basicConstraints"       : "CA:FALSE",
+					"certificatePolicies"    : "Policy: 2.23.140.1.2.1",
+					"crlDistributionPoints"  : "Full Name:\n URI:http://crl.r2m02.amazontrust.com/r2m02.crl",
+					"ctlPoisonByte"          : true,
+					"extendedKeyUsage"       : "TLS Web server authentication, TLS Web client authentication",
+					"keyUsage"               : "Digital Signature, Key Encipherment",
+					"subjectAltName"         : "DNS:d7zdnegbre53n.amplifyapp.com, DNS:*.d7zdnegbre53n.amplifyapp.com",
+					"subjectKeyIdentifier"   : "59:32:78:2A:11:03:62:55:BB:3B:B9:80:24:76:28:90:2E:D1:A4:56"
+				},
+				"fingerprint": "D9:05:A3:D5:AA:F9:68:BC:0C:0A:15:69:C9:5E:11:92:32:67:4F:FA",
+				"issuer": {
+					"C"            : "US",
+					"CN"           : "Amazon RSA 2048 M02",
+					"L"            : null,
+					"O"            : "Amazon",
+					"OU"           : null,
+					"ST"           : null,
+					"aggregated"   : "/C=US/CN=Amazon RSA 2048 M02/O=Amazon",
+					"emailAddress" : null
+				},
+				"not_after"           : 1743811199,
+				"not_before"          : 1709596800,
+				"serial_number"       : "FDB450C1942E3D30A18737063449E62",
+				"signature_algorithm" : "sha256, rsa",
+				"subject": {
+					"C"            : null,
+					"CN"           : "*.d7zdnegbre53n.amplifyapp.com",
+					"L"            : null,
+					"O"            : null,
+					"OU"           : null,
+					"ST"           : null,
+					"aggregated"   : "/CN=*.d7zdnegbre53n.amplifyapp.com",
+					"emailAddress" : null
+				}
+			},
+			"seen": 1709651773.594684,
+			"source": {
+				"name" : "DigiCert Yeti2025 Log",
+				"url"  : "https://yeti2025.ct.digicert.com/log/"
+			},
+			"update_type": "PrecertLogEntry"
+		},
+		"message_type": "certificate_update"
+	}
+'''
diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py
@@ -6,9 +6,9 @@ import logging
 import time
 
 try:
-    import aiofiles
+	import aiofiles
 except ImportError:
-    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+	raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
 
 
 # Set a default elasticsearch index if one is not provided
@@ -16,154 +16,154 @@ default_index = 'eris-massdns'
 
 
 def construct_map() -> dict:
-    '''Construct the Elasticsearch index mapping for records'''
+	'''Construct the Elasticsearch index mapping for records'''
 
-    # Match on exact value or full text search
-    keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
+	# Match on exact value or full text search
+	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
 
-    # Construct the index mapping
-    mapping = {
-        'mappings': {
-            'properties': {
-                'ip'     : { 'type': 'ip' },
-                'record' : keyword_mapping,
-                'seen'   : { 'type': 'date' }
-            }
-        }
-    }
+	# Construct the index mapping
+	mapping = {
+		'mappings': {
+			'properties': {
+				'ip'     : { 'type': 'ip' },
+				'record' : keyword_mapping,
+				'seen'   : { 'type': 'date' }
+			}
+		}
+	}
 
-    return mapping
+	return mapping
 
 
 async def process_data(input_path: str):
-    '''
-    Read and process the input file
-
-    :param input_path: Path to the input file
-    '''
-
-    async with aiofiles.open(input_path) as input_file:
-
-        # Cache the last document to avoid creating a new one for the same IP address
-        last = None
-
-        try:
-            # Read the input file line by line
-            async for line in input_file:
-                line = line.strip()
-
-                # Sentinel value to indicate the end of a process (for closing out a FIFO stream)
-                if line == '~eof':
-                    yield last
-                    break
-
-                # Skip empty lines (doubtful we will have any, but just in case)
-                if not line:
-                    continue
-
-                # Split the line into its parts
-                parts = line.split()
-
-                # Ensure the line has at least 3 parts
-                if len(parts) < 3:
-                    logging.warning(f'Invalid PTR record: {line}')
-                    continue
-
-                # Split the PTR record into its parts
-                name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.')
-
-                # Do not index other records
-                if record_type != 'PTR':
-                    continue
-
-                # Do not index PTR records that do not have a record
-                if not record:
-                    continue
-
-                # Do not index PTR records that have the same record as the in-addr.arpa domain
-                if record == name:
-                    continue
-
-                # Get the IP address from the in-addr.arpa domain
-                ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
-
-                # Check if we are still processing the same IP address
-                if last:
-                    if ip == last['_id']: # This record is for the same IP address as the cached document
-                        last_records = last['doc']['record']
-                        if record not in last_records: # Do not index duplicate records
-                            last['doc']['record'].append(record)
-                        continue
-                    else:
-                        yield last # Return the last document and start a new one
-
-                # Cache the document
-                last = {
-                    '_op_type' : 'update',
-                    '_id'      : ip,
-                    '_index'   : default_index,
-                    'doc'      : {
-                        'ip'     : ip,
-                        'record' : [record], # Consider using painless script to add to list if it exists (Use 'seen' per-record and 'last_seen' for the IP address)
-                        'seen'   : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
-                    },
-                    'doc_as_upsert' : True # Create the document if it does not exist
-                }
-                
-        except Exception as e:
-            logging.error(f'Error processing data: {e}')
+	'''
+	Read and process the input file
+
+	:param input_path: Path to the input file
+	'''
+
+	async with aiofiles.open(input_path) as input_file:
+
+		# Cache the last document to avoid creating a new one for the same IP address
+		last = None
+
+		try:
+			# Read the input file line by line
+			async for line in input_file:
+				line = line.strip()
+
+				# Sentinel value to indicate the end of a process (for closing out a FIFO stream)
+				if line == '~eof':
+					yield last
+					break
+
+				# Skip empty lines (doubtful we will have any, but just in case)
+				if not line:
+					continue
+
+				# Split the line into its parts
+				parts = line.split()
+
+				# Ensure the line has at least 3 parts
+				if len(parts) < 3:
+					logging.warning(f'Invalid PTR record: {line}')
+					continue
+
+				# Split the PTR record into its parts
+				name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.')
+
+				# Do not index other records
+				if record_type != 'PTR':
+					continue
+
+				# Do not index PTR records that do not have a record
+				if not record:
+					continue
+
+				# Do not index PTR records that have the same record as the in-addr.arpa domain
+				if record == name:
+					continue
+
+				# Get the IP address from the in-addr.arpa domain
+				ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
+
+				# Check if we are still processing the same IP address
+				if last:
+					if ip == last['_id']: # This record is for the same IP address as the cached document
+						last_records = last['doc']['record']
+						if record not in last_records: # Do not index duplicate records
+							last['doc']['record'].append(record)
+						continue
+					else:
+						yield last # Return the last document and start a new one
+
+				# Cache the document
+				last = {
+					'_op_type' : 'update',
+					'_id'      : ip,
+					'_index'   : default_index,
+					'doc'      : {
+						'ip'     : ip,
+						'record' : [record], # Consider using painless script to add to list if it exists (Use 'seen' per-record and 'last_seen' for the IP address)
+						'seen'   : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
+					},
+					'doc_as_upsert' : True # Create the document if it does not exist
+				}
+
+		except Exception as e:
+			logging.error(f'Error processing data: {e}')
 
 
 async def test(input_path: str):
-    '''
-    Test the ingestion process
+	'''
+	Test the ingestion process
 
-    :param input_path: Path to the input file
-    '''
-    
-    async for document in process_data(input_path):
-        print(document)
+	:param input_path: Path to the input file
+	'''
+
+	async for document in process_data(input_path):
+		print(document)
 
 
 
 if __name__ == '__main__':
-    import argparse
-    import asyncio
+	import argparse
+	import asyncio
 
-    parser = argparse.ArgumentParser(description='Ingestor for ERIS')
-    parser.add_argument('input_path', help='Path to the input file or directory')
-    args = parser.parse_args()
+	parser = argparse.ArgumentParser(description='Ingestor for ERIS')
+	parser.add_argument('input_path', help='Path to the input file or directory')
+	args = parser.parse_args()
 
-    asyncio.run(test(args.input_path))
+	asyncio.run(test(args.input_path))
 
 
 
 '''
 Deployment:
-    sudo apt-get install build-essential gcc make
-    git clone --depth 1 https://github.com/blechschmidt/massdns.git $HOME/massdns && cd $HOME/massdns && make
-    curl -s https://public-dns.info/nameservers.txt | grep -v ':' > $HOME/massdns/nameservers.txt
-    python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json
-    or...
-    while true; do python ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json; done
+	sudo apt-get install build-essential gcc make
+	git clone --depth 1 https://github.com/blechschmidt/massdns.git $HOME/massdns && cd $HOME/massdns && make
+	curl -s https://public-dns.info/nameservers.txt | grep -v ':' > $HOME/massdns/nameservers.txt
+	python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json
+	or...
+	while true; do python ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json; done
 
 Output:
-    0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
-    0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com.
-    0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net.
+	0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
+	0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com.
+	0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net.
 
 Input:
-    {
-        '_id'     : '47.229.6.0'
-        '_index'  : 'eris-massdns',
-        '_source' : {
-            'ip'     : '47.229.6.0',
-            'record' : ['047-229-006-000.res.spectrum.com'], # We will store as a list for IP addresses with multiple PTR records
-            'seen'   : '2021-06-30T18:31:00Z'
-        }
-    }
+	{
+		'_id'     : '47.229.6.0'
+		'_index'  : 'eris-massdns',
+		'_source' : {
+			'ip'     : '47.229.6.0',
+			'record' : ['047-229-006-000.res.spectrum.com'], # We will store as a list for IP addresses with multiple PTR records
+			'seen'   : '2021-06-30T18:31:00Z'
+		}
+	}
 
 Notes:
-    Why do some IP addresses return a A/CNAME from a PTR request
-    What is dns-servfail.net (Frequent CNAME response from PTR requests)
+	Why do some IP addresses return a A/CNAME from a PTR request
+	What is dns-servfail.net (Frequent CNAME response from PTR requests)
 '''
diff --git a/ingestors/ingest_zone.py b/ingestors/ingest_zone.py
@@ -6,168 +6,179 @@ import logging
 import time
 
 try:
-    import aiofiles
+	import aiofiles
 except ImportError:
-    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+	raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
 
 
+# Set a default elasticsearch index if one is not provided
 default_index = 'dns-zones'
+
+# Known DNS record types found in zone files
 record_types  = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534')
 
 
 def construct_map() -> dict:
-    '''Construct the Elasticsearch index mapping for zone file records.'''
-
-    keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
-
-    mapping = {
-        'mappings': {
-            'properties': {
-                'domain'  : keyword_mapping,
-                'records' : { 'properties': {} },
-                'seen'    : { 'type': 'date' }
-            }
-        }
-    }
-
-    # Add record types to mapping dynamically to not clutter the code
-    for item in record_types:
-        if item in ('a','aaaa'):
-            mapping['mappings']['properties']['records']['properties'][item] = {
-                'properties': {
-                    'data': { 'type': 'ip' },
-                    'ttl':  { 'type': 'integer' }
-                }
-            }
-        else:
-            mapping['mappings']['properties']['records']['properties'][item] = {
-                'properties': {
-                'data': keyword_mapping,
-                'ttl':  { 'type': 'integer' }
-                }
-            }
-
-    return mapping
+	'''Construct the Elasticsearch index mapping for zone file records.'''
+
+	# Match on exact value or full text search
+	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
+
+	# Construct the index mapping
+	mapping = {
+		'mappings': {
+			'properties': {
+				'domain'  : keyword_mapping,
+				'records' : { 'properties': {} },
+				'seen'    : { 'type': 'date' }
+			}
+		}
+	}
+
+	# Add record types to mapping dynamically to not clutter the code
+	for record_type in record_types:
+		if record_type in ('a','aaaa'):
+			mapping['mappings']['properties']['records']['properties'][record_type] = {
+				'properties': {
+					'data': { 'type': 'ip' if record_type in ('a','aaaa') else keyword_mapping},
+					'ttl':  { 'type': 'integer' }
+				}
+			}
+
+	return mapping
 
 
 async def process_data(file_path: str):
-    '''
-    Read and process zone file records.
-
-    :param file_path: Path to the zone file
-    '''
-
-    async with aiofiles.open(file_path) as input_file:
-
-        last = None
-
-        async for line in input_file:
-            line = line.strip()
-
-            if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO)
-                return last
-
-            if not line or line.startswith(';'):
-                continue
-
-            parts = line.split()
-
-            if len(parts) < 5:
-                logging.warning(f'Invalid line: {line}')
-
-            domain, ttl, record_class, record_type, data = parts[0].rstrip('.').lower(), parts[1], parts[2].lower(), parts[3].lower(), ' '.join(parts[4:])
-
-            if not ttl.isdigit():
-                logging.warning(f'Invalid TTL: {ttl} with line: {line}')
-                continue
-            
-            ttl = int(ttl)
-
-            # Anomaly...Doubtful any CHAOS/HESIOD records will be found in zone files
-            if record_class != 'in':
-                logging.warning(f'Unsupported record class: {record_class} with line: {line}')
-                continue
-
-            # We do not want to collide with our current mapping (Again, this is an anomaly)
-            if record_type not in record_types:
-                logging.warning(f'Unsupported record type: {record_type} with line: {line}')
-                continue
-
-            # Little tidying up for specific record types (removing trailing dots, etc)
-            if record_type == 'nsec':
-                data = ' '.join([data.split()[0].rstrip('.'), *data.split()[1:]])
-            elif record_type == 'soa':
-                data = ' '.join([part.rstrip('.') if '.' in part else part for part in data.split()])
-            elif data.endswith('.'):
-                data = data.rstrip('.')
-
-            if last:
-                if domain == last['domain']:
-                    if record_type in last['_doc']['records']:
-                        last['_doc']['records'][record_type].append({'ttl': ttl, 'data': data}) # Do we need to check for duplicate records?
-                    else:
-                        last['_doc']['records'][record_type] = [{'ttl': ttl, 'data': data}]
-                    continue
-                else:
-                    yield last
-
-            last = {
-                '_op_type' : 'update',
-                '_id'      : domain,
-                '_index'   : default_index,
-                '_doc'     : {
-                    'domain'  : domain,
-                    'records' : {record_type: [{'ttl': ttl, 'data': data}]},
-                    'seen'    : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) # Zone files do not contain a timestamp, so we use the current time
-                },
-                'doc_as_upsert' : True # This will create the document if it does not exist
-            }
+	'''
+	Read and process the input file
+
+	:param input_path: Path to the input file
+	'''
+
+	async with aiofiles.open(file_path) as input_file:
+
+		# Initialize the cache
+		last = None
+
+		# Read the input file line by line
+		async for line in input_file:
+			line = line.strip()
+
+			# Sentinel value to indicate the end of a process (for closing out a FIFO stream)
+			if line == '~eof':
+				yield last
+				break
+
+			# Skip empty lines and comments
+			if not line or line.startswith(';'):
+				continue
+
+			# Split the line into its parts
+			parts = line.split()
+
+			# Ensure the line has at least 3 parts
+			if len(parts) < 5:
+				logging.warning(f'Invalid line: {line}')
+				continue
+
+			# Split the record into its parts
+			domain, ttl, record_class, record_type, data = parts[0].rstrip('.').lower(), parts[1], parts[2].lower(), parts[3].lower(), ' '.join(parts[4:])
+
+			# Ensure the TTL is a number
+			if not ttl.isdigit():
+				logging.warning(f'Invalid TTL: {ttl} with line: {line}')
+				continue
+			else:
+				ttl = int(ttl)
+
+			# Do not index other record classes (doubtful any CHAOS/HESIOD records will be found in zone files)
+			if record_class != 'in':
+				logging.warning(f'Unsupported record class: {record_class} with line: {line}')
+				continue
+
+			# Do not index other record types
+			if record_type not in record_types:
+				logging.warning(f'Unsupported record type: {record_type} with line: {line}')
+				continue
+
+			# Little tidying up for specific record types (removing trailing dots, etc)
+			if record_type == 'nsec':
+				data = ' '.join([data.split()[0].rstrip('.'), *data.split()[1:]])
+			elif record_type == 'soa':
+				data = ' '.join([part.rstrip('.') if '.' in part else part for part in data.split()])
+			elif data.endswith('.'):
+				data = data.rstrip('.')
+
+			# Check if we are still processing the same domain
+			if last:
+				if domain == last['domain']: # This record is for the same domain as the cached document
+					if record_type in last['_doc']['records']:
+						last['_doc']['records'][record_type].append({'ttl': ttl, 'data': data}) # Do we need to check for duplicate records?
+					else:
+						last['_doc']['records'][record_type] = [{'ttl': ttl, 'data': data}]
+					continue
+				else:
+					yield last # Return the last document and start a new one
+
+			# Cache the document
+			last = {
+				'_op_type' : 'update',
+				'_id'      : domain,
+				'_index'   : default_index,
+				'_doc'     : {
+					'domain'  : domain,
+					'records' : {record_type: [{'ttl': ttl, 'data': data}]},
+					'seen'    : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) # Zone files do not contain a timestamp, so we use the current time
+				},
+				'doc_as_upsert' : True # This will create the document if it does not exist
+			}
 
 
 async def test(input_path: str):
-    '''
-    Test the Zone file ingestion process
-    
-    :param input_path: Path to the MassDNS log file
-    '''
-    async for document in process_data(input_path):
-        print(document)
+	'''
+	Test the ingestion process
+
+	:param input_path: Path to the input file
+	'''
+
+	async for document in process_data(input_path):
+		print(document)
 
 
 
 if __name__ == '__main__':
-    import argparse
-    import asyncio
+	import argparse
+	import asyncio
 
-    parser = argparse.ArgumentParser(description='Zone file Ingestor for ERIS')
-    parser.add_argument('input_path', help='Path to the input file or directory')
-    args = parser.parse_args()
-    
-    asyncio.run(test(args.input_path))
+	parser = argparse.ArgumentParser(description='Ingestor for ERIS')
+	parser.add_argument('input_path', help='Path to the input file or directory')
+	args = parser.parse_args()
+
+	asyncio.run(test(args.input_path))
 
 
 
 '''
 Output:
-    1001.vegas. 3600 in ns ns11.waterrockdigital.com.
-    1001.vegas. 3600 in ns ns12.waterrockdigital.com.
+	1001.vegas. 3600 in ns ns11.waterrockdigital.com.
+	1001.vegas. 3600 in ns ns12.waterrockdigital.com.
 
 Input:
-    {
-        '_id'     : '1001.vegas'
-        '_index'  : 'dns-zones',
-        '_source' : {
-            'domain'  : '1001.vegas',        
-            'records' : {
-                'ns': [
-                    {'ttl': 3600, 'data': 'ns11.waterrockdigital.com'},
-                    {'ttl': 3600, 'data': 'ns12.waterrockdigital.com'}
-                ]
-            },
-            'seen'    : '2021-09-01T00:00:00Z'
-        }
-    }
+	{
+		'_id'     : '1001.vegas'
+		'_index'  : 'dns-zones',
+		'_source' : {
+			'domain'  : '1001.vegas',
+			'records' : {
+				'ns': [
+					{'ttl': 3600, 'data': 'ns11.waterrockdigital.com'},
+					{'ttl': 3600, 'data': 'ns12.waterrockdigital.com'}
+				]
+			},
+			'seen'    : '2021-09-01T00:00:00Z'
+		}
+	}
 
 Notes:
-    How do we want to handle hashed NSEC3 records? Do we ignest them as they are, or crack the NSEC3 hashes first and ingest?
-'''
-\ No newline at end of file
+	How do we want to handle hashed NSEC3 records? Do we ignest them as they are, or crack the NSEC3 hashes first and ingest?
+'''