eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_masscan.py (4381B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_masscan.py
      4 
      5 import json
      6 import logging
      7 import time
      8 
      9 try:
     10 	import aiofiles
     11 except ImportError:
     12 	raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
     13 
     14 
     15 # Set a default elasticsearch index if one is not provided
     16 default_index = 'eris-masscan'
     17 
     18 
     19 def construct_map() -> dict:
     20 	'''Construct the Elasticsearch index mapping for Masscan records.'''
     21 
     22 	# Match on exact value or full text search
     23 	keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     24 
     25 	# Construct the index mapping
     26 	mapping = {
     27 		'mappings': {
     28 			'properties': {
     29 				'ip'      : { 'type': 'ip' },
     30 				'port'    : { 'type': 'integer' },
     31 				'proto'   : { 'type': 'keyword' },
     32 				'service' : { 'type': 'keyword' },
     33 				'banner'  : keyword_mapping,
     34 				'ttl'     : { 'type': 'integer' },
     35 				'seen'    : { 'type': 'date' }
     36 			}
     37 		}
     38 	}
     39 
     40 	return mapping
     41 
     42 
     43 async def process_data(input_path: str):
     44 	'''
     45 	Read and process the input file
     46 
     47 	:param input_path: Path to the input file
     48 	'''
     49 
     50 	async with aiofiles.open(input_path) as input_file:
     51 		# Read the input file line by line
     52 		async for line in input_file:
     53 			line = line.strip()
     54 
     55 			# Sentinel value to indicate the end of a process (for closing out a FIFO stream)
     56 			if line == '~eof':
     57 				break
     58 
     59 			# Skip empty lines and lines that do not start with a JSON object
     60 			if not line or not line.startswith('{'):
     61 				continue
     62 
     63 			# Parse the JSON record
     64 			try:
     65 				record = json.loads(line)
     66 			except json.decoder.JSONDecodeError:
     67 				logging.error(f'Failed to parse JSON record! ({line})')
     68 				continue
     69 
     70 			# Process the record
     71 			struct = {
     72 				'ip'    : record['ip'],
     73 				'port'  : record['port'],
     74 				'proto' : record['proto'],
     75 				'ttl'   : record['ttl'],
     76 				'seen'  : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp'])))
     77 			}
     78 
     79 			# Add the service information if available (this field is optional)
     80 			if record['rec_type'] == 'banner':
     81 				data = record['data']
     82 				if 'service_name' in data:
     83 					if (service_name := data['service_name']) not in ('unknown', ''):
     84 						struct['service'] = service_name
     85 				if 'banner' in data:
     86 					banner = ' '.join(data['banner'].split()) # Remove extra whitespace
     87 					if banner:
     88 						struct['banner'] = banner
     89 
     90 			# Yield the record
     91 			yield {'_index': default_index, '_source': struct}
     92 
     93 
     94 async def test(input_path: str):
     95 	'''
     96 	Test the ingestion process
     97 
     98 	:param input_path: Path to the input file
     99 	'''
    100 
    101 	async for document in process_data(input_path):
    102 		print(document)
    103 
    104 
    105 
    106 if __name__ == '__main__':
    107 	import argparse
    108 	import asyncio
    109 
    110 	parser = argparse.ArgumentParser(description='Ingestor for ERIS')
    111 	parser.add_argument('input_path', help='Path to the input file or directory')
    112 	args = parser.parse_args()
    113 
    114 	asyncio.run(test(args.input_path))
    115 
    116 
    117 
    118 '''
    119 Deploy:
    120 	apt-get install iptables masscan libpcap-dev screen
    121 	setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan
    122 	/sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP # Not persistent
    123 	printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32"  > exclude.conf
    124 	screen -S scan
    125 	masscan 0.0.0.0/0 -p18000 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oD 18000.json
    126 	masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oD output_new.json --shard $i/$TOTAL
    127 
    128 Output:
    129 	{
    130 		"ip"        : "43.134.51.142",
    131 		"timestamp" : "1705255468",
    132 		"ports"     : [
    133 			{
    134 				"port"    : 22, # We will create a record for each port opened
    135 				"proto"   : "tcp",
    136 				"service" : {
    137 					"name"   : "ssh",
    138 					"banner" : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
    139 				}
    140 			}
    141 		]
    142 	}
    143 
    144 Input:
    145 	{
    146 		"_id"     : "43.134.51.142:22"
    147 		"_index"  : "masscan-logs",
    148 		"_source" : {
    149 			"ip"      : "43.134.51.142",
    150 			"port"    : 22,
    151 			"proto"   : "tcp",
    152 			"service" : "ssh",
    153 			"banner"  : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
    154 			"seen"    : "2021-10-08T02:04:28Z"
    155 	}
    156 '''