eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/eris.git |
Log | Files | Refs | Archive | README | LICENSE |
ingest_zone.py (6009B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # ingest_zone.py 4 5 import logging 6 import time 7 8 try: 9 import aiofiles 10 except ImportError: 11 raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') 12 13 14 # Set a default elasticsearch index if one is not provided 15 default_index = 'eris-zones' 16 17 # Known DNS record types found in zone files 18 record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534') 19 20 21 def construct_map() -> dict: 22 '''Construct the Elasticsearch index mapping for zone file records.''' 23 24 # Match on exact value or full text search 25 keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } 26 27 # Construct the index mapping 28 mapping = { 29 'mappings': { 30 'properties': { 31 'domain' : keyword_mapping, 32 'zone' : { 'type': 'keyword' }, 33 'records' : { 'type': 'nested', 'properties': {} }, 34 'source' : { 'type': 'keyword' }, 35 'seen' : { 'type': 'date' } 36 } 37 } 38 } 39 40 # Add record types to mapping dynamically to not clutter the code 41 for record_type in record_types: 42 if record_type in ('a','aaaa'): 43 mapping['mappings']['properties']['records']['properties'][record_type] = { 44 'type' : 'nested', 45 'properties' : { 46 'data' : { 'type': 'ip' if record_type in ('a','aaaa') else keyword_mapping }, 47 'ttl' : { 'type': 'integer' } 48 } 49 } 50 51 return mapping 52 53 54 async def process_data(file_path: str): 55 ''' 56 Read and process the input file 57 58 :param input_path: Path to the input file 59 ''' 60 61 async with aiofiles.open(file_path) as input_file: 62 63 # Initialize the cache 64 last = None 65 66 # Default source for the records 67 source = 'czds' 68 69 # Determine the zone name from the file path (e.g., /path/to/zones/com.eu.txt -> com.eu zone) 70 zone = '.'.join(file_path.split('/')[-1].split('.')[:-1]) 71 # Note: For now, this is the best way because we are not just ingesting TLD zone files, but entire zones for domains aswell... 72 73 # Read the input file line by line 74 async for line in input_file: 75 line = line.strip() 76 77 # Sentinel value to indicate the end of a process (for closing out a FIFO stream) 78 if line == '~eof': 79 yield last 80 break 81 82 # Skip empty lines and comments 83 if not line: 84 continue 85 86 # Skip comments but detect AXFR transfers to change the source) 87 if line.startswith(';'): 88 if 'DiG' in line and 'AXFR' in line: # Do we need to worry about case sensitivity? How can we store the nameserver aswell? 89 source = 'axfr' 90 continue 91 92 # Split the line into its parts 93 parts = line.split() 94 95 # Ensure the line has at least 3 parts 96 if len(parts) < 5: 97 logging.warning(f'Invalid line: {line}') 98 continue 99 100 # Split the record into its parts 101 domain, ttl, record_class, record_type, data = parts[0].rstrip('.').lower(), parts[1], parts[2].lower(), parts[3].lower(), ' '.join(parts[4:]) 102 103 # Ensure the TTL is a number 104 if not ttl.isdigit(): 105 logging.warning(f'Invalid TTL: {ttl} with line: {line}') 106 continue 107 else: 108 ttl = int(ttl) 109 110 # Do not index other record classes (doubtful any CHAOS/HESIOD records will be found in zone files) 111 if record_class != 'in': 112 logging.warning(f'Unsupported record class: {record_class} with line: {line}') 113 continue 114 115 # Do not index other record types 116 if record_type not in record_types: 117 logging.warning(f'Unsupported record type: {record_type} with line: {line}') 118 continue 119 120 # Little tidying up for specific record types (removing trailing dots, etc) 121 if record_type == 'nsec': 122 data = ' '.join([data.split()[0].rstrip('.'), *data.split()[1:]]) 123 elif record_type == 'soa': 124 data = ' '.join([part.rstrip('.') if '.' in part else part for part in data.split()]) 125 elif data.endswith('.'): 126 data = data.rstrip('.') 127 128 # Check if we are still processing the same domain 129 if last: 130 if domain == last['doc']['domain']: 131 if record_type in last['doc']['records']: 132 last['doc']['records'][record_type].append({'ttl': ttl, 'data': data}) # Do we need to check for duplicate records? 133 else: 134 last['doc']['records'][record_type] = [{'ttl': ttl, 'data': data}] 135 continue 136 else: 137 yield last 138 139 # Cache the document 140 last = { 141 '_op_type' : 'update', 142 '_id' : domain, 143 '_index' : default_index, 144 'doc' : { 145 'domain' : domain, 146 'zone' : zone, 147 'records' : {record_type: [{'data': data, 'ttl': ttl}]}, 148 'source' : source, 149 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) # Zone files do not contain a timestamp, so we use the current time 150 }, 151 'doc_as_upsert' : True # This will create the document if it does not exist 152 } 153 154 155 async def test(input_path: str): 156 ''' 157 Test the ingestion process 158 159 :param input_path: Path to the input file 160 ''' 161 162 async for document in process_data(input_path): 163 print(document) 164 165 166 167 if __name__ == '__main__': 168 import argparse 169 import asyncio 170 171 parser = argparse.ArgumentParser(description='Ingestor for ERIS') 172 parser.add_argument('input_path', help='Path to the input file or directory') 173 args = parser.parse_args() 174 175 asyncio.run(test(args.input_path)) 176 177 178 179 ''' 180 Output: 181 1001.vegas. 3600 in ns ns11.waterrockdigital.com. 182 1001.vegas. 3600 in ns ns12.waterrockdigital.com. 183 184 Input: 185 { 186 '_id' : '1001.vegas' 187 '_index' : 'dns-zones', 188 '_source' : { 189 'domain' : '1001.vegas', 190 'zone' : 'vegas', 191 'records' : { 192 'ns': [ 193 {'ttl': 3600, 'data': 'ns11.waterrockdigital.com'}, 194 {'ttl': 3600, 'data': 'ns12.waterrockdigital.com'} 195 ] 196 }, 197 'source' : 'czds', 198 'seen' : '2021-09-01T00:00:00Z' 199 } 200 } 201 202 Notes: 203 How do we want to handle hashed NSEC3 records? Do we ignest them as they are, or crack the NSEC3 hashes first and ingest? 204 Can an AXFR transfer return data out of order? If so, how do we handle that? 205 '''