eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

ingest_masscan.py (5747B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_masscan.py
      4 
      5 import json
      6 import logging
      7 import time
      8 
      9 try:
     10 	import aiofiles
     11 except ImportError:
     12 	raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
     13 
     14 
     15 # Set a default elasticsearch index if one is not provided
     16 default_index = 'masscan-logs'
     17 
     18 
     19 def construct_map() -> dict:
     20 	'''Construct the Elasticsearch index mapping for Masscan records.'''
     21 
     22 	# Match on exact value or full text search
     23 	keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     24 
     25 	# Construct the geoip mapping (Used with the geoip pipeline to enrich the data)
     26 	geoip_mapping = {
     27 		'city_name'        : keyword_mapping,
     28 		'continent_name'   : keyword_mapping,
     29 		'country_iso_code' : keyword_mapping,
     30 		'country_name'     : keyword_mapping,
     31 		'location'         : { 'type': 'geo_point' },
     32 		'region_iso_code'  : keyword_mapping,
     33 		'region_name'      : keyword_mapping,
     34 	}
     35 
     36 	# Construct the index mapping
     37 	mapping = {
     38 		'mappings': {
     39 			'properties': {
     40 				'ip'      : { 'type': 'ip' },
     41 				'port'    : { 'type': 'integer' },
     42 				'proto'   : { 'type': 'keyword' },
     43 				'service' : { 'type': 'keyword' },
     44 				'banner'  : keyword_mapping,
     45 				'seen'    : { 'type': 'date' }
     46 				#'geoip'	: { 'properties': geoip_mapping }
     47 			}
     48 		}
     49 	}
     50 
     51 	return mapping
     52 
     53 
     54 async def process_data(input_path: str):
     55 	'''
     56 	Read and process the input file
     57 
     58 	:param input_path: Path to the input file
     59 	'''
     60 
     61 	async with aiofiles.open(input_path) as input_file:
     62 		# Read the input file line by line
     63 		async for line in input_file:
     64 			line = line.strip()
     65 
     66 			# Sentinel value to indicate the end of a process (for closing out a FIFO stream)
     67 			if line == '~eof':
     68 				break
     69 
     70 			# Skip empty lines and lines that do not start with a JSON object
     71 			if not line or not line.startswith('{'):
     72 				continue
     73 
     74 			# Do we need this? Masscan JSON output seems with seperate records with a comma between lines for some reason...
     75 			if line.endswith(','):
     76 				line = line[:-1]
     77 
     78 			# Parse the JSON record
     79 			try:
     80 				record = json.loads(line)
     81 			except json.decoder.JSONDecodeError:
     82 				# In rare cases, the JSON record may be incomplete or malformed:
     83 				#   { "ip": "51.161.12.223", "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner":
     84 				#   { "ip": "83.66.211.246", "timestamp": "1706557002"
     85 				logging.error(f'Failed to parse JSON record! ({line})')
     86 				input('Press Enter to continue...') # Pause for review & debugging (remove this in production)
     87 				continue
     88 
     89 			# In rare cases, a single record may contain multiple ports, though I have yet to witness this...
     90 			if len(record['ports']) > 1:
     91 				logging.warning(f'Multiple ports found for record! ({record})')
     92 				input('Press Enter to continue...') # Pause for review (remove this in production)
     93 
     94 			# Process each port in the record
     95 			for port_info in record['ports']:
     96 				struct = {
     97 					'ip'    : record['ip'],
     98 					'port'  : port_info['port'],
     99 					'proto' : port_info['proto'],
    100 					'seen'  : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp'])))
    101 				}
    102 
    103 				# Add the service information if available (this field is optional)
    104 				if 'service' in port_info:
    105 
    106         			# Add the service name if available
    107 					if 'name' in port_info['service']:
    108 						if (service_name := port_info['service']['name']) not in ('unknown',''):
    109 							struct['service'] = service_name
    110 
    111 					# Add the service banner if available
    112 					if 'banner' in port_info['service']:
    113 						banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace
    114 						if banner:
    115 							struct['banner'] = banner
    116 
    117 				# Yield the record
    118 				yield {'_index': default_index, '_source': struct}
    119 
    120 
    121 async def test(input_path: str):
    122 	'''
    123 	Test the ingestion process
    124 
    125 	:param input_path: Path to the input file
    126 	'''
    127 
    128 	async for document in process_data(input_path):
    129 		print(document)
    130 
    131 
    132 
    133 if __name__ == '__main__':
    134 	import argparse
    135 	import asyncio
    136 
    137 	parser = argparse.ArgumentParser(description='Ingestor for ERIS')
    138 	parser.add_argument('input_path', help='Path to the input file or directory')
    139 	args = parser.parse_args()
    140 
    141 	asyncio.run(test(args.input_path))
    142 
    143 
    144 
    145 '''
    146 Deploy:
    147 	apt-get install iptables masscan libpcap-dev screen
    148 	setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan
    149 	/sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP # Not persistent
    150 	printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32"  > exclude.conf
    151 	screen -S scan
    152 	masscan 0.0.0.0/0 -p18000 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ 18000.json
    153 	masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL
    154 
    155 Output:
    156 	{
    157 		"ip"        : "43.134.51.142",
    158 		"timestamp" : "1705255468",
    159 		"ports"     : [
    160 			{
    161 				"port"    : 22, # We will create a record for each port opened
    162 				"proto"   : "tcp",
    163 				"service" : {
    164 					"name"   : "ssh",
    165 					"banner" : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
    166 				}
    167 			}
    168 		]
    169 	}
    170 
    171 Input:
    172 	{
    173 		"_id"     : "43.134.51.142:22"
    174 		"_index"  : "masscan-logs",
    175 		"_source" : {
    176 			"ip"      : "43.134.51.142",
    177 			"port"    : 22,
    178 			"proto"   : "tcp",
    179 			"service" : "ssh",
    180 			"banner"  : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
    181 			"seen"    : "2021-10-08T02:04:28Z"
    182 	}
    183 '''