eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

ingest_masscan.py (5770B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_masscan.py
      4 
      5 '''
      6 apt-get install iptables masscan libpcap-dev screen
      7 setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan
      8 /sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP
      9 printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32"  > exclude.conf
     10 screen -S scan
     11 masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ output.json
     12 masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL
     13 
     14 Note: The above iptables rule is not persistent and will be removed on reboot.
     15 '''
     16 
     17 import json
     18 import logging
     19 import re
     20 import time
     21 
     22 try:
     23     import aiofiles
     24 except ImportError:
     25     raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
     26 
     27 default_index = 'masscan-logs'
     28 
     29 def construct_map() -> dict:
     30     '''Construct the Elasticsearch index mapping for Masscan records.'''
     31 
     32     keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     33 
     34     mapping = {
     35         'mappings': {
     36             'properties': {
     37                 'ip':      { 'type': 'ip' },
     38                 'port':    { 'type': 'integer' },
     39                 'proto':   { 'type': 'keyword' },
     40                 'service': { 'type': 'keyword' },
     41                 'banner':  keyword_mapping,
     42                 'ref_id':  { 'type': 'keyword' },
     43                 'seen':    { 'type': 'date' }
     44                 #'geoip':   {
     45                 #    'properties': {
     46                 #        'city_name':        keyword_mapping,
     47                 #        'continent_name':   keyword_mapping,
     48                 #        'country_iso_code': keyword_mapping,
     49                 #        'country_name':     keyword_mapping,
     50                 #        'location':         { 'type': 'geo_point' },
     51                 #        'region_iso_code':  keyword_mapping,
     52                 #        'region_name':      keyword_mapping,
     53                 #    }
     54                 #}
     55             }
     56         }
     57     }
     58 
     59     return mapping
     60 
     61 
     62 async def process_data(file_path: str):
     63     '''
     64     Read and process Masscan records from the log file.
     65 
     66     :param file_path: Path to the Masscan log file
     67     '''
     68 
     69     async with aiofiles.open(file_path, mode='r') as input_file:
     70         async for line in input_file:
     71             line = line.strip()
     72 
     73             if not line or not line.startswith('{'):
     74                 continue
     75 
     76             if line.endswith(','):
     77                 line = line[:-1]
     78 
     79             try:
     80                 record = json.loads(line)
     81             except json.decoder.JSONDecodeError:
     82                 # In rare cases, the JSON record may be incomplete or malformed:
     83                 #   {   "ip": "51.161.12.223",   "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner":
     84                 #   {   "ip": "83.66.211.246",   "timestamp": "1706557002"
     85                 logging.error(f'Failed to parse JSON record! ({line})')
     86                 input('Press Enter to continue...') # Pause for review & debugging (Will remove pausing in production, still investigating the cause of this issue.)
     87                 continue
     88 
     89             if len(record['ports']) > 1:
     90                 logging.warning(f'Multiple ports found for record! ({record})')
     91                 input('Press Enter to continue...') # Pause for review (Will remove pausing in production, still investigating if you ever seen more than one port in a record.)
     92 
     93             for port_info in record['ports']:
     94                 struct = {
     95                     'ip'    : record['ip'],
     96                     'port'  : port_info['port'],
     97                     'proto' : port_info['proto'],
     98                     'seen'  : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
     99                 }
    100 
    101                 if 'service' in port_info:
    102                     if 'name' in port_info['service']:
    103                         if (service_name := port_info['service']['name']) not in ('unknown',''):
    104                             struct['service'] = service_name
    105 
    106                     if 'banner' in port_info['service']:
    107                         banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace
    108                         if banner:
    109                             match = re.search(r'\(Ref\.Id: (.*?)\)', banner)
    110                             if match:
    111                                 struct['ref_id'] = match.group(1)
    112                             else:
    113                                 struct['banner'] = banner
    114 
    115                 yield {'_index': default_index, '_source': struct}
    116  
    117     return None # EOF
    118 
    119 
    120 
    121 '''
    122 Example record:
    123 {
    124     "ip": "43.134.51.142",
    125     "timestamp": "1705255468", # Convert to ZULU BABY
    126     "ports": [ # We will create a record for each port opened
    127         {
    128             "port": 22,
    129             "proto": "tcp",
    130             "service": { # This field is optional
    131                 "name": "ssh",
    132                 "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
    133             }
    134         }
    135     ]
    136 }
    137 
    138 Will be indexed as:
    139 {
    140     "ip": "43.134.51.142",
    141     "port": 22,
    142     "proto": "tcp",
    143     "service": "ssh",
    144     "banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
    145     "seen": "2021-10-08T02:04:28Z",
    146     "ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # TCP RST Payload, Might be useful..
    147 }
    148 '''