eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_certstream.py (3345B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingestors/ingest_certstream.py
      4 
      5 import asyncio
      6 import json
      7 import logging
      8 import time
      9 
     10 try:
     11 	import websockets
     12 except ImportError:
     13 	raise ImportError('Missing required \'websockets\' library. (pip install websockets)')
     14 
     15 
     16 # Set a default elasticsearch index if one is not provided
     17 default_index = 'eris-certstream'
     18 
     19 
     20 def construct_map() -> dict:
     21 	'''Construct the Elasticsearch index mapping for Certstream records.'''
     22 
     23 	# Match on exact value or full text search
     24 	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     25 
     26 	# Construct the index mapping
     27 	mapping = {
     28 		'mappings': {
     29 			'properties' : {
     30 				'domain'      : keyword_mapping,
     31 				'fingerprint' : keyword_mapping,
     32 				'issuer'      : keyword_mapping,
     33 				'subject'     : {
     34 					'type'      : 'object',
     35 					'properties': {
     36 						'C':  { 'type': 'keyword' },
     37 						'CN': { 'type': 'keyword' },
     38 						'L':  { 'type': 'keyword' },
     39 						'O':  { 'type': 'keyword' },
     40 						'OU': { 'type': 'keyword' }
     41 					}
     42 				},
     43 				'seen' : { 'type': 'date' }
     44 			}
     45 		}
     46 	}
     47 
     48 	return mapping
     49 
     50 
     51 async def process_data(place_holder: str = None):
     52 	'''
     53 	Read and process Certsream records live from the Websocket stream.
     54 
     55 	:param place_holder: Placeholder parameter to match the process_data function signature of other ingestors.
     56 	'''
     57 
     58 	# Loop until the user interrupts the process
     59 	while True:
     60 		try:
     61 
     62 			# Connect to the Certstream websocket
     63 			async for websocket in websockets.connect('wss://certstream.calidog.io', ping_interval=15, ping_timeout=60):
     64 
     65 				# Read the websocket stream
     66 				async for line in websocket:
     67 
     68 					# Parse the JSON record
     69 					try:
     70 						record = json.loads(line)
     71 					except json.decoder.JSONDecodeError:
     72 						logging.error(f'Invalid line from the websocket: {line}')
     73 						continue
     74 
     75 					# Grab the unique domains from the records
     76 					all_domains = set(record['data']['leaf_cert']['all_domains'])
     77 					fingerprint = record['data']['leaf_cert']['fingerprint']
     78 					issuer      = record['data']['leaf_cert']['issuer']['O']
     79 					subject     = {k: v for k, v in record['data']['leaf_cert']['subject'].items() if v is not None}
     80 
     81 					# Create a record for each domain
     82 					for domain in all_domains:
     83 						if domain.startswith('*.'):
     84 							domain = domain[2:]
     85 							if domain in all_domains:
     86 								continue
     87 
     88 						# Construct the document
     89 						struct = {
     90 							'domain'      : domain,
     91 							'fingerprint' : fingerprint,
     92 							'issuer'      : issuer,
     93 							'subject'     : subject,
     94 							'seen'        : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
     95 						}
     96 
     97 						yield {
     98 							'_op_type'      : 'update',
     99 							'_id'           : domain,
    100 							'_index'        : default_index,
    101 							'doc'           : struct,
    102 							'doc_as_upsert' : True
    103 						}
    104 
    105 		except websockets.ConnectionClosed as e	:
    106 			logging.error(f'Connection to Certstream was closed. Attempting to reconnect... ({e})')
    107 			await asyncio.sleep(3)
    108 
    109 		except Exception as e:
    110 			logging.error(f'Error processing Certstream data: {e}')
    111 			await asyncio.sleep(3)
    112 
    113 
    114 async def test():
    115 	'''Test the ingestion process.'''
    116 
    117 	async for document in process_data():
    118 		print(document)
    119 
    120 
    121 
    122 if __name__ == '__main__':
    123 	asyncio.run(test())