eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/eris.git |
Log | Files | Refs | Archive | README | LICENSE |
ingest_certstream.py (3345B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # ingestors/ingest_certstream.py 4 5 import asyncio 6 import json 7 import logging 8 import time 9 10 try: 11 import websockets 12 except ImportError: 13 raise ImportError('Missing required \'websockets\' library. (pip install websockets)') 14 15 16 # Set a default elasticsearch index if one is not provided 17 default_index = 'eris-certstream' 18 19 20 def construct_map() -> dict: 21 '''Construct the Elasticsearch index mapping for Certstream records.''' 22 23 # Match on exact value or full text search 24 keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } 25 26 # Construct the index mapping 27 mapping = { 28 'mappings': { 29 'properties' : { 30 'domain' : keyword_mapping, 31 'fingerprint' : keyword_mapping, 32 'issuer' : keyword_mapping, 33 'subject' : { 34 'type' : 'object', 35 'properties': { 36 'C': { 'type': 'keyword' }, 37 'CN': { 'type': 'keyword' }, 38 'L': { 'type': 'keyword' }, 39 'O': { 'type': 'keyword' }, 40 'OU': { 'type': 'keyword' } 41 } 42 }, 43 'seen' : { 'type': 'date' } 44 } 45 } 46 } 47 48 return mapping 49 50 51 async def process_data(place_holder: str = None): 52 ''' 53 Read and process Certsream records live from the Websocket stream. 54 55 :param place_holder: Placeholder parameter to match the process_data function signature of other ingestors. 56 ''' 57 58 # Loop until the user interrupts the process 59 while True: 60 try: 61 62 # Connect to the Certstream websocket 63 async for websocket in websockets.connect('wss://certstream.calidog.io', ping_interval=15, ping_timeout=60): 64 65 # Read the websocket stream 66 async for line in websocket: 67 68 # Parse the JSON record 69 try: 70 record = json.loads(line) 71 except json.decoder.JSONDecodeError: 72 logging.error(f'Invalid line from the websocket: {line}') 73 continue 74 75 # Grab the unique domains from the records 76 all_domains = set(record['data']['leaf_cert']['all_domains']) 77 fingerprint = record['data']['leaf_cert']['fingerprint'] 78 issuer = record['data']['leaf_cert']['issuer']['O'] 79 subject = {k: v for k, v in record['data']['leaf_cert']['subject'].items() if v is not None} 80 81 # Create a record for each domain 82 for domain in all_domains: 83 if domain.startswith('*.'): 84 domain = domain[2:] 85 if domain in all_domains: 86 continue 87 88 # Construct the document 89 struct = { 90 'domain' : domain, 91 'fingerprint' : fingerprint, 92 'issuer' : issuer, 93 'subject' : subject, 94 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) 95 } 96 97 yield { 98 '_op_type' : 'update', 99 '_id' : domain, 100 '_index' : default_index, 101 'doc' : struct, 102 'doc_as_upsert' : True 103 } 104 105 except websockets.ConnectionClosed as e : 106 logging.error(f'Connection to Certstream was closed. Attempting to reconnect... ({e})') 107 await asyncio.sleep(3) 108 109 except Exception as e: 110 logging.error(f'Error processing Certstream data: {e}') 111 await asyncio.sleep(3) 112 113 114 async def test(): 115 '''Test the ingestion process.''' 116 117 async for document in process_data(): 118 print(document) 119 120 121 122 if __name__ == '__main__': 123 asyncio.run(test())