eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

commit bd0c8baae3b64195907804a2a98f6abded49dd31
parent 124e4b0cf34c4c37992aed0ad3516868b5203c66
Author: acidvegas <acid.vegas@acid.vegas>
Date: Fri, 22 Nov 2024 01:18:28 -0500

Refactoring RIR ingestions

Diffstat:
Meris.py | 50+++++++++++++++++++++++++++++++++-----------------
Mingestors/ingest_massdns.py | 16++++++++++++++--
Mingestors/ingest_rir_delegations.py | 20+++++++++++++-------
Mingestors/ingest_rir_transfers.py | 20+++++++++++---------

4 files changed, 71 insertions(+), 35 deletions(-)

diff --git a/eris.py b/eris.py
@@ -9,8 +9,9 @@ import logging.handlers
 import os
 import stat
 import sys
+import json
 
-sys.dont_write_bytecode = True
+sys.dont_write_bytecode = True # FUCKOFF __pycache__
 
 try:
 	from elasticsearch            import AsyncElasticsearch
@@ -34,8 +35,8 @@ class ElasticIndexer:
 
 		# Sniffing disabled due to an issue with the elasticsearch 8.x client (https://github.com/elastic/elasticsearch-py/issues/2005)
 		es_config = {
-			#'hosts'               : [f'{args.host}:{args.port}'],
-			'hosts'                : [f'{args.host}:{port}' for port in ('9002', '9003', '9004')], # Temporary alternative to sniffing
+			'hosts'               : [f'{args.host}:{args.port}'],
+			#'hosts'                : [f'{args.host}:{port}' for port in ('9200',)], # Temporary alternative to sniffing
 			'verify_certs'         : args.self_signed,
 			'ssl_show_warn'        : args.self_signed,
 			'request_timeout'      : args.timeout,
@@ -104,19 +105,27 @@ class ElasticIndexer:
 		Index records in chunks to Elasticsearch.
 
 		:param file_path: Path to the file
-		:param index_name: Name of the index
 		:param data_generator: Generator for the records to index
 		'''
 
-		count = 0
-		total = 0
+		count  = 0
+		total  = 0
+		errors = []
 
 		try:
-			async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max):
+			async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max,raise_on_error=False):
 				action, result = result.popitem()
 
 				if not ok:
-					logging.error(f'Failed to index document ({result["_id"]}) to {self.es_index} from {file_path} ({result})')
+					error_type   = result.get('error', {}).get('type',   'unknown')
+					error_reason = result.get('error', {}).get('reason', 'unknown')
+					logging.error('FAILED DOCUMENT:')
+					logging.error(f'Error Type   : {error_type}')
+					logging.error(f'Error Reason : {error_reason}')
+					logging.error('Document     : ')
+					logging.error(json.dumps(result, indent=2))
+					input('Press Enter to continue...')
+					errors.append(result)
 					continue
 
 				count += 1
@@ -126,7 +135,8 @@ class ElasticIndexer:
 					logging.info(f'Successfully indexed {self.chunk_size:,} ({total:,} processed) records to {self.es_index} from {file_path}')
 					count = 0
 
-			logging.info(f'Finished indexing {total:,} records to {self.es_index} from {file_path}')
+			if errors:
+				raise Exception(f'{len(errors):,} document(s) failed to index. Check the logs above for details.')
 
 		except Exception as e:
 			raise Exception(f'Failed to index records to {self.es_index} from {file_path} ({e})')
@@ -148,7 +158,7 @@ def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_
 	logger = logging.getLogger()
 	logger.setLevel(logging.DEBUG) # Minimum level to capture all logs
 
-	# Clear existing handlersaise Exception(f'Failed to fetch zone links: {e}')
+	# Clear existing handlers
 	logger.handlers = []
 
 	# Setup console handler
@@ -203,8 +213,8 @@ async def main():
 	parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
 
 	# Performance arguments
-	parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk')
-	parser.add_argument('--chunk-max', type=int, default=100, help='Maximum size of a chunk in bytes')
+	parser.add_argument('--chunk-size', type=int, default=5000, help='Number of records to index in a chunk')
+	parser.add_argument('--chunk-max', type=int, default=10485760, help='Maximum size of a chunk in bytes (default 10mb)')
 	parser.add_argument('--retries', type=int, default=30, help='Number of times to retry indexing a chunk before failing')
 	parser.add_argument('--timeout', type=int, default=60, help='Number of seconds to wait before retrying a chunk')
 
@@ -214,6 +224,8 @@ async def main():
 	parser.add_argument('--masscan', action='store_true', help='Index Masscan records')
 	parser.add_argument('--massdns', action='store_true', help='Index Massdns records')
 	parser.add_argument('--zone', action='store_true', help='Index Zone records')
+	parser.add_argument('--rir-delegations', action='store_true', help='Index RIR Delegations records')
+	parser.add_argument('--rir-transfers', action='store_true', help='Index RIR Transfers records')
 
 	args = parser.parse_args()
 
@@ -239,15 +251,19 @@ async def main():
 	edx = ElasticIndexer(args)
 
 	if args.certstream:
-		from ingestors import ingest_certstream as ingestor
+		from ingestors import ingest_certstream      as ingestor
 	elif args.httpx:
-		from ingestors import ingest_httpx      as ingestor
+		from ingestors import ingest_httpx           as ingestor
 	elif args.masscan:
-		from ingestors import ingest_masscan    as ingestor
+		from ingestors import ingest_masscan         as ingestor
 	elif args.massdns:
-		from ingestors import ingest_massdns    as ingestor
+		from ingestors import ingest_massdns         as ingestor
+	elif args.rir_delegations:
+		from ingestors import ingest_rir_delegations as ingestor
+	elif args.rir_transfers:
+		from ingestors import ingest_rir_transfers   as ingestor
 	elif args.zone:
-		from ingestors import ingest_zone       as ingestor
+		from ingestors import ingest_zone            as ingestor
 	else:
 		raise ValueError('No ingestor specified')
 
diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py
@@ -140,13 +140,25 @@ if __name__ == '__main__':
 
 '''
 Deployment:
+	printf "\nsession    required   pam_limits.so" >> /etc/pam.d/su
+	printf "acidvegas    hard    nofile    65535\nacidvegas    soft    nofile    65535" >> /etc/security/limits.conf
+	echo "net.netfilter.nf_conntrack_max = 131072" >> /etc/sysctl.conf
+	echo "net.netfilter.nf_conntrack_udp_timeout = 30" >> /etc/sysctl.conf
+	echo "net.netfilter.nf_conntrack_udp_timeout_stream = 120" >> /etc/sysctl.conf
+	echo "net.netfilter.nf_conntrack_tcp_timeout_established = 300" >> /etc/sysctl.conf
+	sysctl -p
+
 	sudo apt-get install build-essential gcc make python3 python3-pip
 	pip install aiofiles aiohttp elasticsearch
 	git clone --depth 1 https://github.com/acidvegas/eris.git $HOME/eris
 
 	git clone --depth 1 https://github.com/blechschmidt/massdns.git $HOME/massdns && cd $HOME/massdns && make
-	curl -s https://public-dns.info/nameservers.txt | grep -v ':' > $HOME/massdns/nameservers.txt
-	while true; do python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/eris/FIFO; done
+	wget -O $HOME/massdns/resolvers.txt https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt
+	while true; do python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/resolvers.txt -t PTR --filter NOERROR -s 5000 -o S -w $HOME/eris/FIFO; done
+
+	screen -S eris
+	python3 $HOME/eris/eris.py --massdns
+
 
 
 Output:
diff --git a/ingestors/ingest_rir_delegations.py b/ingestors/ingest_rir_delegations.py
@@ -14,7 +14,7 @@ except ImportError:
 
 
 # Set a default elasticsearch index if one is not provided
-default_index = 'rir-delegation-' + time.strftime('%Y-%m-%d')
+default_index = 'eris-rir-delegations'
 
 # Delegation data sources
 delegation_db = {
@@ -52,7 +52,8 @@ def construct_map() -> dict:
 				},
 				'date'       : { 'type': 'date'    },
 				'status'     : { 'type': 'keyword' },
-				'extensions' : keyword_mapping
+				'extensions' : keyword_mapping,
+				'seen'       : { 'type': 'date' }
 			}
 		}
 	}
@@ -60,8 +61,12 @@ def construct_map() -> dict:
 	return mapping
 
 
-async def process_data():
-	'''Read and process the delegation data.'''
+async def process_data(place_holder: str = None):
+	'''
+	Read and process the delegation data.
+
+	:param place_holder: Placeholder parameter to match the process_data function signature of other ingestors.
+	'''
 
 	for registry, url in delegation_db.items():
 		try:
@@ -150,12 +155,13 @@ async def process_data():
 							if not record['date'] or record['date'] == '00000000':
 								del record['date']
 							else:
-								record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(record['date'], '%Y%m%d')),
+								record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(record['date'], '%Y%m%d'))
 
 							if record['status'] not in ('allocated', 'assigned', 'available', 'reserved', 'unallocated', 'unknown'):
 								raise ValueError(f'Invalid status: {cache}')
-
-							#json_output['records'].append(record)
+							
+							# Set the seen timestamp
+							record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ')
 
 							# Let's just index the records themself (for now)
 							yield {'_index': default_index, '_source': record}
diff --git a/ingestors/ingest_rir_transfers.py b/ingestors/ingest_rir_transfers.py
@@ -13,7 +13,7 @@ except ImportError:
 
 
 # Set a default elasticsearch index if one is not provided
-default_index = 'rir-transfer-' + time.strftime('%Y-%m-%d')
+default_index = 'eris-rir-transfers'
 
 # Transfers data sources
 transfers_db = {
@@ -38,27 +38,28 @@ def construct_map() -> dict:
 				'date'    : { 'type': 'date' },
 				'ip4nets' : {
 					'properties': {
-						'original_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } },
-						'transfer_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } }
+						'original_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } },
+						'transfer_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } }
 					}
 				},
 				'ip6nets' : {
 					'properties': {
-						'original_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } },
-						'transfer_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } }
+						'original_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } },
+						'transfer_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } }
 					}
 				},
 				'asns' : {
 					'properties': {
-						'original_set' : { 'properties': { 'start': { 'type': 'integer' }, 'end' : { 'type': 'integer' } } },
-						'transfer_set' : { 'properties': { 'start': { 'type': 'integer' }, 'end' : { 'type': 'integer' } } }
+						'original_set': { 'properties': { 'start': { 'type': 'integer' }, 'end': { 'type': 'integer' } } },
+						'transfer_set': { 'properties': { 'start': { 'type': 'integer' }, 'end': { 'type': 'integer' } } }
 					}
 				},
 				'type'                   : { 'type': 'keyword' },
-				'source_organization'    : { 'properties': { 'name':  keyword_mapping, 'country_code' : { 'type': 'keyword' } } },
-				'recipient_organization' : { 'properties': { 'name':  keyword_mapping, 'country_code' : { 'type': 'keyword' } } },
+				'source_organization'    : { 'properties': { 'name':  keyword_mapping, 'country_code': { 'type': 'keyword' } } },
+				'recipient_organization' : { 'properties': { 'name':  keyword_mapping, 'country_code': { 'type': 'keyword' } } },
 				'source_rir'             : { 'type': 'keyword' },
 				'recipient_rir'          : { 'type': 'keyword' },
+				'seen'                   : { 'type': 'date' }
 			}
 		}
 	}
@@ -89,6 +90,7 @@ async def process_data():
 						raise Exception(f'Invalid {registry} delegation data: {json_data}')
 
 					for record in json_data['transfers']:
+						record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
 
 						if 'asns' in record:
 							for set_type in ('original_set', 'transfer_set'):