eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_fcc.py (9697B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_fcc.py
      4 
      5 '''
      6 This plugin downloads and processes FCC license data from:
      7 https://data.fcc.gov/download/license-view/fcc-license-view-data-csv-format.zip
      8 '''
      9 
     10 import asyncio
     11 import csv
     12 import io
     13 import logging
     14 import zipfile
     15 from datetime import datetime
     16 
     17 try:
     18     import aiohttp
     19 except ImportError:
     20     raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)')
     21 
     22 # Set a default elasticsearch index if one is not provided
     23 default_index = 'eris-fcc'
     24 
     25 # FCC data URL
     26 FCC_URL = 'https://data.fcc.gov/download/license-view/fcc-license-view-data-csv-format.zip'
     27 
     28 
     29 def construct_map() -> dict:
     30     '''Construct the Elasticsearch index mapping for FCC license records.'''
     31     
     32     # Match on exact value or full text search
     33     keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     34 
     35     # Numeric fields with potential decimal values
     36     float_mapping = { 'type': 'float' }
     37     
     38     return {
     39         'mappings': {
     40             'properties': {
     41                 'license_id'              : {'type': 'long'},
     42                 'source_system'           : keyword_mapping,
     43                 'callsign'                : keyword_mapping,
     44                 'facility_id'             : {'type': 'long'},
     45                 'frn'                     : keyword_mapping,
     46                 'lic_name'                : keyword_mapping,
     47                 'common_name'             : keyword_mapping,
     48                 'radio_service_code'      : keyword_mapping,
     49                 'radio_service_desc'      : keyword_mapping,
     50                 'rollup_category_code'    : keyword_mapping,
     51                 'rollup_category_desc'    : keyword_mapping,
     52                 'grant_date'              : {'type': 'date'},
     53                 'expired_date'            : {'type': 'date'},
     54                 'cancellation_date'       : {'type': 'date'},
     55                 'last_action_date'        : {'type': 'date'},
     56                 'lic_status_code'         : keyword_mapping,
     57                 'lic_status_desc'         : keyword_mapping,
     58                 'rollup_status_code'      : keyword_mapping,
     59                 'rollup_status_desc'      : keyword_mapping,
     60                 'entity_type_code'        : keyword_mapping,
     61                 'entity_type_desc'        : keyword_mapping,
     62                 'rollup_entity_code'      : keyword_mapping,
     63                 'rollup_entity_desc'      : keyword_mapping,
     64                 'lic_address'             : keyword_mapping,
     65                 'lic_city'                : keyword_mapping,
     66                 'lic_state'               : keyword_mapping,
     67                 'lic_zip_code'            : keyword_mapping,
     68                 'lic_attention_line'      : keyword_mapping,
     69                 'contact_company'         : keyword_mapping,
     70                 'contact_name'            : keyword_mapping,
     71                 'contact_title'           : keyword_mapping,
     72                 'contact_address1'        : keyword_mapping,
     73                 'contact_address2'        : keyword_mapping,
     74                 'contact_city'            : keyword_mapping,
     75                 'contact_state'           : keyword_mapping,
     76                 'contact_zip'             : keyword_mapping,
     77                 'contact_country'         : keyword_mapping,
     78                 'contact_phone'           : keyword_mapping,
     79                 'contact_fax'             : keyword_mapping,
     80                 'contact_email'           : keyword_mapping,
     81                 'market_code'             : keyword_mapping,
     82                 'market_desc'             : keyword_mapping,
     83                 'channel_block'           : keyword_mapping,
     84                 'loc_type_code'           : keyword_mapping,
     85                 'loc_type_desc'           : keyword_mapping,
     86                 'loc_city'                : keyword_mapping,
     87                 'loc_county_code'         : keyword_mapping,
     88                 'loc_county_name'         : keyword_mapping,
     89                 'loc_state'               : keyword_mapping,
     90                 'loc_radius_op'           : float_mapping,
     91                 'loc_seq_id'              : {'type': 'long'},
     92                 'loc_lat_deg'             : {'type': 'integer'},
     93                 'loc_lat_min'             : {'type': 'integer'},
     94                 'loc_lat_sec'             : float_mapping,
     95                 'loc_lat_dir'             : keyword_mapping,
     96                 'loc_long_deg'            : {'type': 'integer'},
     97                 'loc_long_min'            : {'type': 'integer'},
     98                 'loc_long_sec'            : float_mapping,
     99                 'loc_long_dir'            : keyword_mapping,
    100                 'hgt_structure'           : float_mapping,
    101                 'asr_num'                 : keyword_mapping,
    102                 'antenna_id'              : {'type': 'long'},
    103                 'ant_seq_id'              : {'type': 'long'},
    104                 'ant_make'                : keyword_mapping,
    105                 'ant_model'               : keyword_mapping,
    106                 'ant_type_code'           : keyword_mapping,
    107                 'ant_type_desc'           : keyword_mapping,
    108                 'azimuth'                 : float_mapping,
    109                 'beamwidth'               : float_mapping,
    110                 'polarization_code'       : keyword_mapping,
    111                 'frequency_id'            : {'type': 'long'},
    112                 'freq_seq_id'             : {'type': 'long'},
    113                 'freq_class_station_code' : keyword_mapping,
    114                 'freq_class_station_desc' : keyword_mapping,
    115                 'power_erp'               : float_mapping,
    116                 'power_output'            : float_mapping,
    117                 'frequency_assigned'      : float_mapping,
    118                 'frequency_upper_band'    : float_mapping,
    119                 'unit_of_measure'         : keyword_mapping,
    120                 'tolerance'               : float_mapping,
    121                 'emission_id'             : {'type': 'long'},
    122                 'emission_seq_id'         : {'type': 'long'},
    123                 'emission_code'           : keyword_mapping,
    124                 'ground_elevation'        : float_mapping
    125             }
    126         }
    127     }
    128 
    129 async def download_and_extract_csv():
    130     '''Download and extract the FCC license data ZIP file.'''
    131 
    132     async with aiohttp.ClientSession() as session:
    133         async with session.get(FCC_URL) as response:
    134             if response.status != 200:
    135                 raise Exception(f'Failed to download FCC data: HTTP {response.status}')
    136             
    137             zip_data = await response.read()
    138             
    139     with zipfile.ZipFile(io.BytesIO(zip_data)) as zip_file:
    140         # Get the first CSV file in the ZIP
    141         csv_filename = next(name for name in zip_file.namelist() if name.endswith('.csv'))
    142         return zip_file.read(csv_filename).decode('utf-8')
    143 
    144 
    145 def parse_date(date_str):
    146     '''Parse date string to ISO format or return None if invalid.'''
    147     if not date_str or date_str == '0000-00-00':
    148         return None
    149     try:
    150         # Try the new format first
    151         return datetime.strptime(date_str.strip(), '%m/%d/%Y %H:%M:%S').strftime('%Y-%m-%dT%H:%M:%SZ')
    152     except ValueError:
    153         try:
    154             # Try the old format
    155             return datetime.strptime(date_str, '%Y-%m-%d').strftime('%Y-%m-%dT%H:%M:%SZ')
    156         except ValueError:
    157             return None
    158 
    159 
    160 async def process_data(input_path: str = None):
    161     '''
    162     Process the FCC license data.
    163     
    164     :param input_path: Optional path to local CSV file (if not downloading)
    165     '''
    166 
    167     if input_path:
    168         with open(input_path, 'r') as f:
    169             csv_data = f.read()
    170     else:
    171         csv_data = await download_and_extract_csv()
    172     
    173     # Process CSV data
    174     csv_reader = csv.DictReader(io.StringIO(csv_data))
    175     
    176     for row in csv_reader:
    177         # Convert date fields
    178         date_fields = ['grant_date', 'expired_date', 'cancellation_date', 'last_action_date']
    179         
    180         for field in date_fields:
    181             if field in row:
    182                 row[field] = parse_date(row[field])
    183         
    184         # Convert numeric fields
    185         numeric_fields = {
    186             'int'   : ['facility_id', 'loc_lat_deg', 'loc_long_deg', 'loc_lat_min', 'loc_long_min', 'loc_seq_id', 'ant_seq_id', 'freq_seq_id', 'emission_seq_id'],
    187             'float' : ['loc_lat_sec', 'loc_long_sec', 'loc_radius_op', 'hgt_structure', 'azimuth', 'beamwidth', 'power_erp', 'power_output', 'frequency_assigned', 'frequency_upper_band', 'tolerance', 'ground_elevation'],
    188             'long'  : ['license_id', 'antenna_id', 'frequency_id', 'emission_id']
    189         }
    190         
    191         for field_type, fields in numeric_fields.items():
    192             for field in fields:
    193                 if field in row and row[field]:
    194                     try:
    195                         if field_type == 'int':
    196                             row[field] = int(float(row[field]))
    197                         elif field_type == 'float':
    198                             row[field] = float(row[field])
    199                         elif field_type == 'long':
    200                             row[field] = int(float(row[field]))
    201                     except (ValueError, TypeError):
    202                         row[field] = None
    203         
    204         # Remove empty fields
    205         record = {k.lower(): v for k, v in row.items() if v not in (None, '', 'NULL')}
    206         
    207         yield {'_index': default_index, '_source': record}
    208 
    209 async def test():
    210     '''Test the ingestion process.'''
    211     async for document in process_data():
    212         print(document)
    213 
    214 if __name__ == '__main__':
    215     asyncio.run(test())