eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/-c.git |
Log | Files | Refs | Archive | README | LICENSE |
ingest_masscan.py (5770B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # ingest_masscan.py 4 5 ''' 6 apt-get install iptables masscan libpcap-dev screen 7 setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan 8 /sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP 9 printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32" > exclude.conf 10 screen -S scan 11 masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ output.json 12 masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL 13 14 Note: The above iptables rule is not persistent and will be removed on reboot. 15 ''' 16 17 import json 18 import logging 19 import re 20 import time 21 22 try: 23 import aiofiles 24 except ImportError: 25 raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') 26 27 default_index = 'masscan-logs' 28 29 def construct_map() -> dict: 30 '''Construct the Elasticsearch index mapping for Masscan records.''' 31 32 keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } 33 34 mapping = { 35 'mappings': { 36 'properties': { 37 'ip': { 'type': 'ip' }, 38 'port': { 'type': 'integer' }, 39 'proto': { 'type': 'keyword' }, 40 'service': { 'type': 'keyword' }, 41 'banner': keyword_mapping, 42 'ref_id': { 'type': 'keyword' }, 43 'seen': { 'type': 'date' } 44 #'geoip': { 45 # 'properties': { 46 # 'city_name': keyword_mapping, 47 # 'continent_name': keyword_mapping, 48 # 'country_iso_code': keyword_mapping, 49 # 'country_name': keyword_mapping, 50 # 'location': { 'type': 'geo_point' }, 51 # 'region_iso_code': keyword_mapping, 52 # 'region_name': keyword_mapping, 53 # } 54 #} 55 } 56 } 57 } 58 59 return mapping 60 61 62 async def process_data(file_path: str): 63 ''' 64 Read and process Masscan records from the log file. 65 66 :param file_path: Path to the Masscan log file 67 ''' 68 69 async with aiofiles.open(file_path, mode='r') as input_file: 70 async for line in input_file: 71 line = line.strip() 72 73 if not line or not line.startswith('{'): 74 continue 75 76 if line.endswith(','): 77 line = line[:-1] 78 79 try: 80 record = json.loads(line) 81 except json.decoder.JSONDecodeError: 82 # In rare cases, the JSON record may be incomplete or malformed: 83 # { "ip": "51.161.12.223", "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner": 84 # { "ip": "83.66.211.246", "timestamp": "1706557002" 85 logging.error(f'Failed to parse JSON record! ({line})') 86 input('Press Enter to continue...') # Pause for review & debugging (Will remove pausing in production, still investigating the cause of this issue.) 87 continue 88 89 if len(record['ports']) > 1: 90 logging.warning(f'Multiple ports found for record! ({record})') 91 input('Press Enter to continue...') # Pause for review (Will remove pausing in production, still investigating if you ever seen more than one port in a record.) 92 93 for port_info in record['ports']: 94 struct = { 95 'ip' : record['ip'], 96 'port' : port_info['port'], 97 'proto' : port_info['proto'], 98 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))), 99 } 100 101 if 'service' in port_info: 102 if 'name' in port_info['service']: 103 if (service_name := port_info['service']['name']) not in ('unknown',''): 104 struct['service'] = service_name 105 106 if 'banner' in port_info['service']: 107 banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace 108 if banner: 109 match = re.search(r'\(Ref\.Id: (.*?)\)', banner) 110 if match: 111 struct['ref_id'] = match.group(1) 112 else: 113 struct['banner'] = banner 114 115 yield {'_index': default_index, '_source': struct} 116 117 return None # EOF 118 119 120 121 ''' 122 Example record: 123 { 124 "ip": "43.134.51.142", 125 "timestamp": "1705255468", # Convert to ZULU BABY 126 "ports": [ # We will create a record for each port opened 127 { 128 "port": 22, 129 "proto": "tcp", 130 "service": { # This field is optional 131 "name": "ssh", 132 "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4" 133 } 134 } 135 ] 136 } 137 138 Will be indexed as: 139 { 140 "ip": "43.134.51.142", 141 "port": 22, 142 "proto": "tcp", 143 "service": "ssh", 144 "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4", 145 "seen": "2021-10-08T02:04:28Z", 146 "ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # TCP RST Payload, Might be useful.. 147 } 148 '''