eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/-c.git |
Log | Files | Refs | Archive | README | LICENSE |
sniff_patch_async.py (3654B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # sniff_patch.py 4 5 # Note: 6 # This is a patch for the elasticsearch 8.x client to fix the sniff_* options. 7 # This patch is only needed if you use the sniff_* options and only works with basic auth. 8 # Call init_elasticsearch() with normal Elasticsearch params. 9 # 10 # Source: 11 # - https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960 12 13 import base64 14 15 import elasticsearch._async.client as async_client 16 from elasticsearch.exceptions import SerializationError, ConnectionError 17 18 19 async def init_elasticsearch_async(*args, **kwargs): 20 ''' 21 Initialize the Async Elasticsearch client with the sniff patch. 22 23 :param args: Async Elasticsearch positional arguments. 24 :param kwargs: Async Elasticsearch keyword arguments. 25 ''' 26 async_client.default_sniff_callback = _override_async_sniff_callback(kwargs['basic_auth']) 27 28 return async_client.AsyncElasticsearch(*args, **kwargs) 29 30 31 def _override_async_sniff_callback(basic_auth): 32 ''' 33 Taken from https://github.com/elastic/elasticsearch-py/blob/8.8/elasticsearch/_sync/client/_base.py#L166 34 Completely unmodified except for adding the auth header to the elastic request. 35 Allows us to continue using the sniff_* options while this is broken in the library. 36 37 TODO: Remove this when this issue is patched: 38 - https://github.com/elastic/elasticsearch-py/issues/2005 39 ''' 40 auth_str = base64.b64encode(':'.join(basic_auth).encode()).decode() 41 sniffed_node_callback = async_client._base._default_sniffed_node_callback 42 43 async def modified_async_sniff_callback(transport, sniff_options): 44 for _ in transport.node_pool.all(): 45 try: 46 meta, node_infos = await transport.perform_request( 47 'GET', 48 '/_nodes/_all/http', 49 headers={ 50 'accept': 'application/vnd.elasticsearch+json; compatible-with=8', 51 'authorization': f'Basic {auth_str}' # Authorization header 52 }, 53 request_timeout=( 54 sniff_options.sniff_timeout 55 if not sniff_options.is_initial_sniff 56 else None 57 ), 58 ) 59 except (SerializationError, ConnectionError): 60 continue 61 62 if not 200 <= meta.status <= 299: 63 continue 64 65 node_configs = [] 66 for node_info in node_infos.get('nodes', {}).values(): 67 address = node_info.get('http', {}).get('publish_address') 68 if not address or ':' not in address: 69 continue 70 71 # Processing address for host and port 72 if '/' in address: 73 fqdn, ipaddress = address.split('/', 1) 74 host = fqdn 75 _, port_str = ipaddress.rsplit(':', 1) 76 port = int(port_str) 77 else: 78 host, port_str = address.rsplit(':', 1) 79 port = int(port_str) 80 81 assert sniffed_node_callback is not None 82 sniffed_node = await sniffed_node_callback( 83 node_info, meta.node.replace(host=host, port=port) 84 ) 85 if sniffed_node is None: 86 continue 87 88 node_configs.append(sniffed_node) 89 90 if node_configs: 91 return node_configs 92 93 return [] 94 95 return modified_async_sniff_callback