eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/eris.git |
Log | Files | Refs | Archive | README | LICENSE |
ingest_fcc.py (9697B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # ingest_fcc.py 4 5 ''' 6 This plugin downloads and processes FCC license data from: 7 https://data.fcc.gov/download/license-view/fcc-license-view-data-csv-format.zip 8 ''' 9 10 import asyncio 11 import csv 12 import io 13 import logging 14 import zipfile 15 from datetime import datetime 16 17 try: 18 import aiohttp 19 except ImportError: 20 raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)') 21 22 # Set a default elasticsearch index if one is not provided 23 default_index = 'eris-fcc' 24 25 # FCC data URL 26 FCC_URL = 'https://data.fcc.gov/download/license-view/fcc-license-view-data-csv-format.zip' 27 28 29 def construct_map() -> dict: 30 '''Construct the Elasticsearch index mapping for FCC license records.''' 31 32 # Match on exact value or full text search 33 keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } 34 35 # Numeric fields with potential decimal values 36 float_mapping = { 'type': 'float' } 37 38 return { 39 'mappings': { 40 'properties': { 41 'license_id' : {'type': 'long'}, 42 'source_system' : keyword_mapping, 43 'callsign' : keyword_mapping, 44 'facility_id' : {'type': 'long'}, 45 'frn' : keyword_mapping, 46 'lic_name' : keyword_mapping, 47 'common_name' : keyword_mapping, 48 'radio_service_code' : keyword_mapping, 49 'radio_service_desc' : keyword_mapping, 50 'rollup_category_code' : keyword_mapping, 51 'rollup_category_desc' : keyword_mapping, 52 'grant_date' : {'type': 'date'}, 53 'expired_date' : {'type': 'date'}, 54 'cancellation_date' : {'type': 'date'}, 55 'last_action_date' : {'type': 'date'}, 56 'lic_status_code' : keyword_mapping, 57 'lic_status_desc' : keyword_mapping, 58 'rollup_status_code' : keyword_mapping, 59 'rollup_status_desc' : keyword_mapping, 60 'entity_type_code' : keyword_mapping, 61 'entity_type_desc' : keyword_mapping, 62 'rollup_entity_code' : keyword_mapping, 63 'rollup_entity_desc' : keyword_mapping, 64 'lic_address' : keyword_mapping, 65 'lic_city' : keyword_mapping, 66 'lic_state' : keyword_mapping, 67 'lic_zip_code' : keyword_mapping, 68 'lic_attention_line' : keyword_mapping, 69 'contact_company' : keyword_mapping, 70 'contact_name' : keyword_mapping, 71 'contact_title' : keyword_mapping, 72 'contact_address1' : keyword_mapping, 73 'contact_address2' : keyword_mapping, 74 'contact_city' : keyword_mapping, 75 'contact_state' : keyword_mapping, 76 'contact_zip' : keyword_mapping, 77 'contact_country' : keyword_mapping, 78 'contact_phone' : keyword_mapping, 79 'contact_fax' : keyword_mapping, 80 'contact_email' : keyword_mapping, 81 'market_code' : keyword_mapping, 82 'market_desc' : keyword_mapping, 83 'channel_block' : keyword_mapping, 84 'loc_type_code' : keyword_mapping, 85 'loc_type_desc' : keyword_mapping, 86 'loc_city' : keyword_mapping, 87 'loc_county_code' : keyword_mapping, 88 'loc_county_name' : keyword_mapping, 89 'loc_state' : keyword_mapping, 90 'loc_radius_op' : float_mapping, 91 'loc_seq_id' : {'type': 'long'}, 92 'loc_lat_deg' : {'type': 'integer'}, 93 'loc_lat_min' : {'type': 'integer'}, 94 'loc_lat_sec' : float_mapping, 95 'loc_lat_dir' : keyword_mapping, 96 'loc_long_deg' : {'type': 'integer'}, 97 'loc_long_min' : {'type': 'integer'}, 98 'loc_long_sec' : float_mapping, 99 'loc_long_dir' : keyword_mapping, 100 'hgt_structure' : float_mapping, 101 'asr_num' : keyword_mapping, 102 'antenna_id' : {'type': 'long'}, 103 'ant_seq_id' : {'type': 'long'}, 104 'ant_make' : keyword_mapping, 105 'ant_model' : keyword_mapping, 106 'ant_type_code' : keyword_mapping, 107 'ant_type_desc' : keyword_mapping, 108 'azimuth' : float_mapping, 109 'beamwidth' : float_mapping, 110 'polarization_code' : keyword_mapping, 111 'frequency_id' : {'type': 'long'}, 112 'freq_seq_id' : {'type': 'long'}, 113 'freq_class_station_code' : keyword_mapping, 114 'freq_class_station_desc' : keyword_mapping, 115 'power_erp' : float_mapping, 116 'power_output' : float_mapping, 117 'frequency_assigned' : float_mapping, 118 'frequency_upper_band' : float_mapping, 119 'unit_of_measure' : keyword_mapping, 120 'tolerance' : float_mapping, 121 'emission_id' : {'type': 'long'}, 122 'emission_seq_id' : {'type': 'long'}, 123 'emission_code' : keyword_mapping, 124 'ground_elevation' : float_mapping 125 } 126 } 127 } 128 129 async def download_and_extract_csv(): 130 '''Download and extract the FCC license data ZIP file.''' 131 132 async with aiohttp.ClientSession() as session: 133 async with session.get(FCC_URL) as response: 134 if response.status != 200: 135 raise Exception(f'Failed to download FCC data: HTTP {response.status}') 136 137 zip_data = await response.read() 138 139 with zipfile.ZipFile(io.BytesIO(zip_data)) as zip_file: 140 # Get the first CSV file in the ZIP 141 csv_filename = next(name for name in zip_file.namelist() if name.endswith('.csv')) 142 return zip_file.read(csv_filename).decode('utf-8') 143 144 145 def parse_date(date_str): 146 '''Parse date string to ISO format or return None if invalid.''' 147 if not date_str or date_str == '0000-00-00': 148 return None 149 try: 150 # Try the new format first 151 return datetime.strptime(date_str.strip(), '%m/%d/%Y %H:%M:%S').strftime('%Y-%m-%dT%H:%M:%SZ') 152 except ValueError: 153 try: 154 # Try the old format 155 return datetime.strptime(date_str, '%Y-%m-%d').strftime('%Y-%m-%dT%H:%M:%SZ') 156 except ValueError: 157 return None 158 159 160 async def process_data(input_path: str = None): 161 ''' 162 Process the FCC license data. 163 164 :param input_path: Optional path to local CSV file (if not downloading) 165 ''' 166 167 if input_path: 168 with open(input_path, 'r') as f: 169 csv_data = f.read() 170 else: 171 csv_data = await download_and_extract_csv() 172 173 # Process CSV data 174 csv_reader = csv.DictReader(io.StringIO(csv_data)) 175 176 for row in csv_reader: 177 # Convert date fields 178 date_fields = ['grant_date', 'expired_date', 'cancellation_date', 'last_action_date'] 179 180 for field in date_fields: 181 if field in row: 182 row[field] = parse_date(row[field]) 183 184 # Convert numeric fields 185 numeric_fields = { 186 'int' : ['facility_id', 'loc_lat_deg', 'loc_long_deg', 'loc_lat_min', 'loc_long_min', 'loc_seq_id', 'ant_seq_id', 'freq_seq_id', 'emission_seq_id'], 187 'float' : ['loc_lat_sec', 'loc_long_sec', 'loc_radius_op', 'hgt_structure', 'azimuth', 'beamwidth', 'power_erp', 'power_output', 'frequency_assigned', 'frequency_upper_band', 'tolerance', 'ground_elevation'], 188 'long' : ['license_id', 'antenna_id', 'frequency_id', 'emission_id'] 189 } 190 191 for field_type, fields in numeric_fields.items(): 192 for field in fields: 193 if field in row and row[field]: 194 try: 195 if field_type == 'int': 196 row[field] = int(float(row[field])) 197 elif field_type == 'float': 198 row[field] = float(row[field]) 199 elif field_type == 'long': 200 row[field] = int(float(row[field])) 201 except (ValueError, TypeError): 202 row[field] = None 203 204 # Remove empty fields 205 record = {k.lower(): v for k, v in row.items() if v not in (None, '', 'NULL')} 206 207 yield {'_index': default_index, '_source': record} 208 209 async def test(): 210 '''Test the ingestion process.''' 211 async for document in process_data(): 212 print(document) 213 214 if __name__ == '__main__': 215 asyncio.run(test())