eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_ptrstream.py (2650B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingestors/ingest_ptrstream.py
      4 
      5 '''
      6 This ingestor is meant to be used with https://github.com/acidvegas/ptrstream
      7 '''
      8 
      9 import json
     10 import logging
     11 
     12 try:
     13 	import aiofiles
     14 except ImportError:
     15 	raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
     16 
     17 
     18 # Set a default elasticsearch index if one is not provided
     19 default_index = 'havoc-ptr'
     20 
     21 
     22 def construct_map() -> dict:
     23 	'''Construct the Elasticsearch index mapping for records'''
     24 
     25 	# Match on exact value or full text search
     26 	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     27 
     28 	# Construct the index mapping
     29 	mapping = {
     30 		'mappings': {
     31 			'properties': {
     32 				'ip'          : {'type': 'ip'},
     33 				'nameserver'  : {'type': 'ip'},
     34 				'record'      : keyword_mapping,
     35 				'record_type' : {'type': 'keyword'},
     36 				'ttl'         : {'type': 'integer'},
     37 				'seen'        : {'type': 'date'}
     38 			}
     39 		}
     40 	}
     41 
     42 	return mapping
     43 
     44 
     45 async def process_data(input_path: str):
     46 	'''
     47 	Read and process the input file
     48 
     49 	:param input_path: Path to the input file
     50 	'''
     51 
     52 	# Open the input file
     53 	async with aiofiles.open(input_path) as input_file:
     54 
     55 		# Read the input file line by line
     56 		async for line in input_file:
     57 
     58 			# Strip whitespace
     59 			line = line.strip()
     60 
     61 			# Skip empty lines and lines that do not start with a JSON object
     62 			if not line or not line.startswith('{'):
     63 				continue
     64 
     65 			# Parse the JSON record
     66 			try:
     67 				record = json.loads(line)
     68 			except json.decoder.JSONDecodeError:
     69 				logging.error(f'Failed to parse JSON record! ({line})')
     70 				continue
     71 
     72 			# Get the IP address from the in-addr.arpa domain
     73 			ip = record['ip']
     74 
     75 			# Do not index PTR records that have the same record as the in-addr.arpa domain
     76 			if record['record'] == '.'.join(ip.split('.')[::-1]) + '.in-addr.arpa':
     77 				continue
     78 
     79 			# Create the document structure
     80 			yield {
     81 				'_op_type'      : 'update',
     82 				'_id'           : ip,
     83 				'_index'        : default_index,
     84 				'doc'           : record,
     85 				'doc_as_upsert' : True # Create the document if it does not exist
     86 			}
     87 
     88 
     89 async def test(input_path: str):
     90 	'''
     91 	Test the ingestion process
     92 
     93 	:param input_path: Path to the input file
     94 	'''
     95 
     96 	async for document in process_data(input_path):
     97 		print(document)
     98 
     99 
    100 
    101 if __name__ == '__main__':
    102 	import argparse
    103 	import asyncio
    104 
    105 	parser = argparse.ArgumentParser(description='Ingestor for ERIS')
    106 	parser.add_argument('input_path', help='Path to the input file or directory')
    107 	args = parser.parse_args()
    108 
    109 	asyncio.run(test(args.input_path))