diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py
@@ -11,14 +11,17 @@ except ImportError:
raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+# Set a default elasticsearch index if one is not provided
default_index = 'eris-massdns'
def construct_map() -> dict:
- '''Construct the Elasticsearch index mapping for MassDNS records'''
+ '''Construct the Elasticsearch index mapping for records'''
+ # Match on exact value or full text search
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
+ # Construct the index mapping
mapping = {
'mappings': {
'properties': {
@@ -32,91 +35,92 @@ def construct_map() -> dict:
return mapping
-async def process_data(file_path: str):
+async def process_data(input_path: str):
'''
- Read and process Massdns records from the log file.
+ Read and process the input file
- :param file_path: Path to the Massdns log file
+ :param input_path: Path to the input file
'''
- async with aiofiles.open(file_path) as input_file:
+ async with aiofiles.open(input_path) as input_file:
+ # Cache the last document to avoid creating a new one for the same IP address
last = None
- async for line in input_file:
- line = line.strip()
+ try:
+ # Read the input file line by line
+ async for line in input_file:
+ line = line.strip()
- # Sentinel value to indicate the end of a process (for closing out a FIFO stream)
- if line == '~eof':
- yield last
- break
+ # Sentinel value to indicate the end of a process (for closing out a FIFO stream)
+ if line == '~eof':
+ yield last
+ break
- # Skip empty lines
- if not line:
- continue
+ # Skip empty lines (doubtful we will have any, but just in case)
+ if not line:
+ continue
- # Split the line into its parts
- parts = line.split()
+ # Split the line into its parts
+ parts = line.split()
- # Ensure the line has at least 3 parts
- if len(parts) < 3:
- logging.warning(f'Invalid PTR record: {line}')
- continue
+ # Ensure the line has at least 3 parts
+ if len(parts) < 3:
+ logging.warning(f'Invalid PTR record: {line}')
+ continue
- # Split the PTR record into its parts
- name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.')
+ # Split the PTR record into its parts
+ name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.')
- # Do not index other records
- if record_type != 'PTR':
- continue
+ # Do not index other records
+ if record_type != 'PTR':
+ continue
- # Do not index PTR records that do not have a record
- if not record:
- continue
+ # Do not index PTR records that do not have a record
+ if not record:
+ continue
- # Let's not index the PTR record if it's the same as the in-addr.arpa domain
- if record == name:
- continue
+ # Do not index PTR records that have the same record as the in-addr.arpa domain
+ if record == name:
+ continue
- # Get the IP address from the in-addr.arpa domain
- ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
+ # Get the IP address from the in-addr.arpa domain
+ ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
- # Check if we are still processing the same IP address
- if last:
- if ip == last['_id']:
- last_record = last['doc']['record']
- if isinstance(last_record, list):
- if record not in last_record:
+ # Check if we are still processing the same IP address
+ if last:
+ if ip == last['_id']: # This record is for the same IP address as the cached document
+ last_records = last['doc']['record']
+ if record not in last_records: # Do not index duplicate records
last['doc']['record'].append(record)
- else:
- logging.warning(f'Duplicate PTR record: {line}')
+ continue
else:
- if record != last_record:
- last['doc']['record'] = [last_record, record] # IP addresses with more than one PTR record will turn into a list
- continue
- else:
- yield last # Return the last document and start a new one
-
- # Cache the this document in-case we have more for the same IP address
- last = {
- '_op_type' : 'update',
- '_id' : ip,
- '_index' : default_index,
- 'doc' : {
- 'ip' : ip,
- 'record' : record,
- 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
- },
- 'doc_as_upsert' : True # This will create the document if it does not exist
- }
+ yield last # Return the last document and start a new one
+
+ # Cache the document
+ last = {
+ '_op_type' : 'update',
+ '_id' : ip,
+ '_index' : default_index,
+ 'doc' : {
+ 'ip' : ip,
+ 'record' : [record], # Consider using painless script to add to list if it exists (Use 'seen' per-record and 'last_seen' for the IP address)
+ 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
+ },
+ 'doc_as_upsert' : True # Create the document if it does not exist
+ }
+
+ except Exception as e:
+ logging.error(f'Error processing data: {e}')
async def test(input_path: str):
'''
- Test the MassDNS ingestion process
+ Test the ingestion process
- :param input_path: Path to the MassDNS log file
+ :param input_path: Path to the input file
'''
+
async for document in process_data(input_path):
print(document)
@@ -126,7 +130,7 @@ if __name__ == '__main__':
import argparse
import asyncio
- parser = argparse.ArgumentParser(description='MassDNS Ingestor for ERIS')
+ parser = argparse.ArgumentParser(description='Ingestor for ERIS')
parser.add_argument('input_path', help='Path to the input file or directory')
args = parser.parse_args()
@@ -139,9 +143,9 @@ Deployment:
sudo apt-get install build-essential gcc make
git clone --depth 1 https://github.com/blechschmidt/massdns.git $HOME/massdns && cd $HOME/massdns && make
curl -s https://public-dns.info/nameservers.txt | grep -v ':' > $HOME/massdns/nameservers.txt
- python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR-s 500 -o S -w $HOME/massdns/fifo.json
+ python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json
or...
- while true; do python ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -s 1000 -o S -w $HOME/massdns/fifo.json; done
+ while true; do python ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json; done
Output:
0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
@@ -150,16 +154,16 @@ Output:
Input:
{
- "_id" : "47.229.6.0"
- "_index" : "eris-massdns",
- "_source" : {
- "ip" : "47.229.6.0",
- "record" : "047-229-006-000.res.spectrum.com", # This will be a list if there are more than one PTR record
- "seen" : "2021-06-30T18:31:00Z"
+ '_id' : '47.229.6.0'
+ '_index' : 'eris-massdns',
+ '_source' : {
+ 'ip' : '47.229.6.0',
+ 'record' : ['047-229-006-000.res.spectrum.com'], # We will store as a list for IP addresses with multiple PTR records
+ 'seen' : '2021-06-30T18:31:00Z'
}
}
Notes:
- Why do some IP addresses return a CNAME from a PTR request
+ Why do some IP addresses return a A/CNAME from a PTR request
What is dns-servfail.net (Frequent CNAME response from PTR requests)
'''
|