eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/eris.git |
Log | Files | Refs | Archive | README | LICENSE |
ingest_ptrstream.py (2650B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # ingestors/ingest_ptrstream.py 4 5 ''' 6 This ingestor is meant to be used with https://github.com/acidvegas/ptrstream 7 ''' 8 9 import json 10 import logging 11 12 try: 13 import aiofiles 14 except ImportError: 15 raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') 16 17 18 # Set a default elasticsearch index if one is not provided 19 default_index = 'havoc-ptr' 20 21 22 def construct_map() -> dict: 23 '''Construct the Elasticsearch index mapping for records''' 24 25 # Match on exact value or full text search 26 keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } 27 28 # Construct the index mapping 29 mapping = { 30 'mappings': { 31 'properties': { 32 'ip' : {'type': 'ip'}, 33 'nameserver' : {'type': 'ip'}, 34 'record' : keyword_mapping, 35 'record_type' : {'type': 'keyword'}, 36 'ttl' : {'type': 'integer'}, 37 'seen' : {'type': 'date'} 38 } 39 } 40 } 41 42 return mapping 43 44 45 async def process_data(input_path: str): 46 ''' 47 Read and process the input file 48 49 :param input_path: Path to the input file 50 ''' 51 52 # Open the input file 53 async with aiofiles.open(input_path) as input_file: 54 55 # Read the input file line by line 56 async for line in input_file: 57 58 # Strip whitespace 59 line = line.strip() 60 61 # Skip empty lines and lines that do not start with a JSON object 62 if not line or not line.startswith('{'): 63 continue 64 65 # Parse the JSON record 66 try: 67 record = json.loads(line) 68 except json.decoder.JSONDecodeError: 69 logging.error(f'Failed to parse JSON record! ({line})') 70 continue 71 72 # Get the IP address from the in-addr.arpa domain 73 ip = record['ip'] 74 75 # Do not index PTR records that have the same record as the in-addr.arpa domain 76 if record['record'] == '.'.join(ip.split('.')[::-1]) + '.in-addr.arpa': 77 continue 78 79 # Create the document structure 80 yield { 81 '_op_type' : 'update', 82 '_id' : ip, 83 '_index' : default_index, 84 'doc' : record, 85 'doc_as_upsert' : True # Create the document if it does not exist 86 } 87 88 89 async def test(input_path: str): 90 ''' 91 Test the ingestion process 92 93 :param input_path: Path to the input file 94 ''' 95 96 async for document in process_data(input_path): 97 print(document) 98 99 100 101 if __name__ == '__main__': 102 import argparse 103 import asyncio 104 105 parser = argparse.ArgumentParser(description='Ingestor for ERIS') 106 parser.add_argument('input_path', help='Path to the input file or directory') 107 args = parser.parse_args() 108 109 asyncio.run(test(args.input_path))