eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

sniff_patch.py (3851B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # sniff_patch.py [asyncronous developement]
      4 
      5 # Note:
      6 #   This is a patch for the elasticsearch 8.x client to fix the sniff_* options.
      7 #   This patch is only needed if you use the sniff_* options and only works with basic auth.
      8 #   Call init_elasticsearch() with normal Elasticsearch params.
      9 #
     10 # Source:
     11 #   - https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960
     12 
     13 import base64
     14 
     15 import elasticsearch._async.client as async_client
     16 from elasticsearch.exceptions import SerializationError, ConnectionError
     17 
     18 
     19 async def init_elasticsearch_async(*args, **kwargs):
     20     '''
     21     Initialize the Async Elasticsearch client with the sniff patch.
     22     
     23     :param args: Async Elasticsearch positional arguments.
     24     :param kwargs: Async Elasticsearch keyword arguments.
     25     '''
     26     async_client.default_sniff_callback = _override_async_sniff_callback(kwargs['basic_auth'])
     27 
     28     return async_client.AsyncElasticsearch(*args, **kwargs)
     29 
     30 
     31 def _override_async_sniff_callback(basic_auth):
     32     '''
     33     Taken from https://github.com/elastic/elasticsearch-py/blob/8.8/elasticsearch/_sync/client/_base.py#L166
     34     Completely unmodified except for adding the auth header to the elastic request.
     35     Allows us to continue using the sniff_* options while this is broken in the library.
     36 
     37     TODO: Remove this when this issue is patched:
     38         - https://github.com/elastic/elasticsearch-py/issues/2005
     39     '''
     40     auth_str = base64.b64encode(':'.join(basic_auth).encode()).decode()
     41     sniffed_node_callback = async_client._base._default_sniffed_node_callback
     42 
     43     async def modified_async_sniff_callback(transport, sniff_options):
     44         for _ in transport.node_pool.all():
     45             try:
     46                 meta, node_infos = await transport.perform_request(
     47                     'GET',
     48                     '/_nodes/_all/http',
     49                     headers={
     50                         'accept': 'application/vnd.elasticsearch+json; compatible-with=8',
     51                         'authorization': f'Basic {auth_str}'  # This auth header is missing in 8.x releases of the client, and causes 401s
     52                     },
     53                     request_timeout=(
     54                         sniff_options.sniff_timeout
     55                         if not sniff_options.is_initial_sniff
     56                         else None
     57                     ),
     58                 )
     59             except (SerializationError, ConnectionError):
     60                 continue
     61 
     62             if not 200 <= meta.status <= 299:
     63                 continue
     64 
     65             node_configs = []
     66             for node_info in node_infos.get('nodes', {}).values():
     67                 address = node_info.get('http', {}).get('publish_address')
     68                 if not address or ':' not in address:
     69                     continue
     70 
     71                 if '/' in address:
     72                     # Support 7.x host/ip:port behavior where http.publish_host has been set.
     73                     fqdn, ipaddress = address.split('/', 1)
     74                     host = fqdn
     75                     _, port_str = ipaddress.rsplit(':', 1)
     76                     port = int(port_str)
     77                 else:
     78                     host, port_str = address.rsplit(':', 1)
     79                     port = int(port_str)
     80 
     81                 assert sniffed_node_callback is not None
     82                 sniffed_node = await sniffed_node_callback(
     83                     node_info, meta.node.replace(host=host, port=port)
     84                 )
     85                 if sniffed_node is None:
     86                     continue
     87 
     88                 # Use the node which was able to make the request as a base.
     89                 node_configs.append(sniffed_node)
     90 
     91             if node_configs:
     92                 return node_configs
     93 
     94         return []
     95 
     96     return modified_async_sniff_callback