eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_zone.py (6009B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_zone.py
      4 
      5 import logging
      6 import time
      7 
      8 try:
      9 	import aiofiles
     10 except ImportError:
     11 	raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
     12 
     13 
     14 # Set a default elasticsearch index if one is not provided
     15 default_index = 'eris-zones'
     16 
     17 # Known DNS record types found in zone files
     18 record_types  = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534')
     19 
     20 
     21 def construct_map() -> dict:
     22 	'''Construct the Elasticsearch index mapping for zone file records.'''
     23 
     24 	# Match on exact value or full text search
     25 	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     26 
     27 	# Construct the index mapping
     28 	mapping = {
     29 		'mappings': {
     30 			'properties': {
     31 				'domain'  : keyword_mapping,
     32 				'zone'    : { 'type': 'keyword' },
     33 				'records' : { 'type': 'nested', 'properties': {} },
     34 				'source'  : { 'type': 'keyword' },
     35 				'seen'    : { 'type': 'date' }
     36 			}
     37 		}
     38 	}
     39 
     40 	# Add record types to mapping dynamically to not clutter the code
     41 	for record_type in record_types:
     42 		if record_type in ('a','aaaa'):
     43 			mapping['mappings']['properties']['records']['properties'][record_type] = {
     44 				'type'       : 'nested',
     45 				'properties' : {
     46 					'data' : { 'type': 'ip' if record_type in ('a','aaaa') else keyword_mapping },
     47 					'ttl'  : { 'type': 'integer' }
     48 				}
     49 			}
     50 
     51 	return mapping
     52 
     53 
     54 async def process_data(file_path: str):
     55 	'''
     56 	Read and process the input file
     57 
     58 	:param input_path: Path to the input file
     59 	'''
     60 
     61 	async with aiofiles.open(file_path) as input_file:
     62 
     63 		# Initialize the cache
     64 		last = None
     65 
     66 		# Default source for the records
     67 		source = 'czds'
     68 
     69 		# Determine the zone name from the file path (e.g., /path/to/zones/com.eu.txt -> com.eu zone)
     70 		zone = '.'.join(file_path.split('/')[-1].split('.')[:-1])
     71 		# Note: For now, this is the best way because we are not just ingesting TLD zone files, but entire zones for domains aswell...
     72 
     73 		# Read the input file line by line
     74 		async for line in input_file:
     75 			line = line.strip()
     76 
     77 			# Sentinel value to indicate the end of a process (for closing out a FIFO stream)
     78 			if line == '~eof':
     79 				yield last
     80 				break
     81 
     82 			# Skip empty lines and comments
     83 			if not line:
     84 				continue
     85 
     86 			# Skip comments but detect AXFR transfers to change the source)
     87 			if line.startswith(';'):
     88 				if 'DiG' in line and 'AXFR' in line: # Do we need to worry about case sensitivity? How can we store the nameserver aswell?
     89 					source = 'axfr'
     90 				continue
     91 
     92 			# Split the line into its parts
     93 			parts = line.split()
     94 
     95 			# Ensure the line has at least 3 parts
     96 			if len(parts) < 5:
     97 				logging.warning(f'Invalid line: {line}')
     98 				continue
     99 
    100 			# Split the record into its parts
    101 			domain, ttl, record_class, record_type, data = parts[0].rstrip('.').lower(), parts[1], parts[2].lower(), parts[3].lower(), ' '.join(parts[4:])
    102 
    103 			# Ensure the TTL is a number
    104 			if not ttl.isdigit():
    105 				logging.warning(f'Invalid TTL: {ttl} with line: {line}')
    106 				continue
    107 			else:
    108 				ttl = int(ttl)
    109 
    110 			# Do not index other record classes (doubtful any CHAOS/HESIOD records will be found in zone files)
    111 			if record_class != 'in':
    112 				logging.warning(f'Unsupported record class: {record_class} with line: {line}')
    113 				continue
    114 
    115 			# Do not index other record types
    116 			if record_type not in record_types:
    117 				logging.warning(f'Unsupported record type: {record_type} with line: {line}')
    118 				continue
    119 
    120 			# Little tidying up for specific record types (removing trailing dots, etc)
    121 			if record_type == 'nsec':
    122 				data = ' '.join([data.split()[0].rstrip('.'), *data.split()[1:]])
    123 			elif record_type == 'soa':
    124 				data = ' '.join([part.rstrip('.') if '.' in part else part for part in data.split()])
    125 			elif data.endswith('.'):
    126 				data = data.rstrip('.')
    127 
    128 			# Check if we are still processing the same domain
    129 			if last:
    130 				if domain == last['doc']['domain']:
    131 					if record_type in last['doc']['records']:
    132 						last['doc']['records'][record_type].append({'ttl': ttl, 'data': data}) # Do we need to check for duplicate records?
    133 					else:
    134 						last['doc']['records'][record_type] = [{'ttl': ttl, 'data': data}]
    135 					continue
    136 				else:
    137 					yield last
    138 
    139 			# Cache the document
    140 			last = {
    141 				'_op_type' : 'update',
    142 				'_id'      : domain,
    143 				'_index'   : default_index,
    144 				'doc'     : {
    145 					'domain'  : domain,
    146 					'zone'    : zone,
    147 					'records' : {record_type: [{'data': data, 'ttl': ttl}]},
    148 					'source'  : source,
    149 					'seen'    : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) # Zone files do not contain a timestamp, so we use the current time
    150 				},
    151 				'doc_as_upsert' : True # This will create the document if it does not exist
    152 			}
    153 
    154 
    155 async def test(input_path: str):
    156 	'''
    157 	Test the ingestion process
    158 
    159 	:param input_path: Path to the input file
    160 	'''
    161 
    162 	async for document in process_data(input_path):
    163 		print(document)
    164 
    165 
    166 
    167 if __name__ == '__main__':
    168 	import argparse
    169 	import asyncio
    170 
    171 	parser = argparse.ArgumentParser(description='Ingestor for ERIS')
    172 	parser.add_argument('input_path', help='Path to the input file or directory')
    173 	args = parser.parse_args()
    174 
    175 	asyncio.run(test(args.input_path))
    176 
    177 
    178 
    179 '''
    180 Output:
    181 	1001.vegas. 3600 in ns ns11.waterrockdigital.com.
    182 	1001.vegas. 3600 in ns ns12.waterrockdigital.com.
    183 
    184 Input:
    185 	{
    186 		'_id'     : '1001.vegas'
    187 		'_index'  : 'dns-zones',
    188 		'_source' : {
    189 			'domain'  : '1001.vegas',
    190 			'zone'    : 'vegas',
    191 			'records' : {
    192 				'ns': [
    193 					{'ttl': 3600, 'data': 'ns11.waterrockdigital.com'},
    194 					{'ttl': 3600, 'data': 'ns12.waterrockdigital.com'}
    195 				]
    196 			},
    197 			'source'  : 'czds',
    198 			'seen'    : '2021-09-01T00:00:00Z'
    199 		}
    200 	}
    201 
    202 Notes:
    203 	How do we want to handle hashed NSEC3 records? Do we ignest them as they are, or crack the NSEC3 hashes first and ingest?
    204 	Can an AXFR transfer return data out of order? If so, how do we handle that?
    205 '''