diff --git a/ingestors/ingest_zone.py b/ingestors/ingest_zone.py
@@ -9,8 +9,10 @@ try:
except ImportError:
raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+
default_index = 'dns-zones'
-record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534')
+record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534')
+
def construct_map() -> dict:
'''Construct the Elasticsearch index mapping for zone file records.'''
@@ -61,6 +63,9 @@ async def process_data(file_path: str):
async for line in input_file:
line = line.strip()
+ if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO)
+ break
+
if not line or line.startswith(';'):
continue
@@ -76,14 +81,15 @@ async def process_data(file_path: str):
ttl = int(ttl)
+ # Anomaly...Doubtful any CHAOS/HESIOD records will be found in zone files
if record_class != 'in':
- raise ValueError(f'Unsupported record class: {record_class} with line: {line}') # Anomaly (Doubtful any CHAOS/HESIOD records will be found)
+ raise ValueError(f'Unsupported record class: {record_class} with line: {line}')
# We do not want to collide with our current mapping (Again, this is an anomaly)
if record_type not in record_types:
raise ValueError(f'Unsupported record type: {record_type} with line: {line}')
- # Little tidying up for specific record types
+ # Little tidying up for specific record types (removing trailing dots, etc)
if record_type == 'nsec':
data = ' '.join([data.split()[0].rstrip('.'), *data.split()[1:]])
elif record_type == 'soa':
@@ -93,11 +99,15 @@ async def process_data(file_path: str):
if domain != last_domain:
if last_domain:
- struct = {'domain': last_domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}
+ struct = {
+ 'domain' : last_domain,
+ 'records' : domain_records[last_domain],
+ 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) # Zone files do not contain a timestamp, so we use the current time
+ }
del domain_records[last_domain]
- yield {'_index': default_index, '_source': struct}
+ yield {'_id': domain, '_index': default_index, '_source': struct} # Set the ID to the domain name to allow the record to be reindexed if it exists.
last_domain = domain
@@ -108,8 +118,6 @@ async def process_data(file_path: str):
domain_records[domain][record_type].append({'ttl': ttl, 'data': data})
- return None # EOF
-
'''
@@ -125,13 +133,17 @@ Example record:
Will be indexed as:
{
- "domain": "1001.vegas",
- "records": {
- "ns": [
- {"ttl": 3600, "data": "ns11.waterrockdigital.com"},
- {"ttl": 3600, "data": "ns12.waterrockdigital.com"}
- ]
- },
- "seen": "2021-09-01T00:00:00Z" # Zulu time added upon indexing
+ "_id" : "1001.vegas"
+ "_index" : "dns-zones",
+ "_source" : {
+ "domain" : "1001.vegas",
+ "records" : { # All records are stored in a single dictionary
+ "ns": [
+ {"ttl": 3600, "data": "ns11.waterrockdigital.com"},
+ {"ttl": 3600, "data": "ns12.waterrockdigital.com"}
+ ]
+ },
+ "seen" : "2021-09-01T00:00:00Z" # Zulu time added upon indexing
+ }
}
'''
\ No newline at end of file
|