tools

- collection of tools for supernets sysadmins
git clone git://git.acid.vegas/tools.git
Log | Files | Refs | Archive

commands.py (5078B)

      1 import csv
      2 import io
      3 import json
      4 import urllib.request
      5 import sys
      6 import time
      7 
      8 def download_file(url: str, dest_filename: str, chunk_size: int = 1024*1024):
      9     '''
     10     Download a file from a given URL in chunks and save to a destination filename.
     11 
     12     :param url: The URL of the file to download
     13     :param dest_filename: The destination filename to save the downloaded file
     14     :param chunk_size: Size of chunks to download. Default is set to 1MB.
     15     '''
     16     with urllib.request.urlopen(url) as response:
     17         total_size = int(response.getheader('Content-Length').strip())
     18         downloaded_size = 0
     19         with open(dest_filename, 'wb') as out_file:
     20             while True:
     21                 start_time = time.time()
     22                 chunk = response.read(chunk_size)
     23                 if not chunk:
     24                     break
     25                 downloaded_size += len(chunk)
     26                 out_file.write(chunk)
     27                 end_time = time.time()
     28                 speed = len(chunk) / (end_time - start_time)
     29                 progress = (downloaded_size / total_size) * 100
     30                 sys.stdout.write(f'\rDownloaded {downloaded_size} of {total_size} bytes ({progress:.2f}%) at {speed/1024:.2f} KB/s\r')
     31                 sys.stdout.flush()
     32             print()
     33 
     34 def get_url(url: str, sent_headers: dict = {}, reader: bool = True):
     35 	'''
     36 	Retrieve a URL with custom headers.
     37 	
     38 	:param url: The URL to retrieve
     39 	:param data: The headers to send
     40 	:param reader: Return the reader object instead of the decoded data
     41 	'''
     42 	req = urllib.request.Request(url, headers=sent_headers)
     43 	if reader:
     44 		return urllib.request.urlopen(req, timeout=10).read().decode()
     45 	else:
     46 		return urllib.request.urlopen(req, timeout=10)
     47       
     48 def setup_user_agent(user_agent: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'):
     49 	'''
     50 	Set up urllib.request user agent.
     51 	
     52 	:param user_agent: The user agent to use
     53 	'''
     54 	handler = urllib.request.HTTPHandler()
     55 	opener = urllib.request.build_opener(handler)
     56 	opener.addheaders = [('User-agent', user_agent)]
     57 	urllib.request.install_opener(opener)
     58 
     59 
     60 # -------------------------------------------------------------------------------- #
     61 
     62 def asn_seach(query: str):
     63     '''
     64     Search for an ASN by string.
     65       
     66     :param query: The string to search
     67     '''
     68     return json.loads(get_url('https://api.bgpview.io/search?query_term='+query))
     69 
     70 def cve_search(query: str, limit: str = '25'):
     71 	'''
     72     Search for a CVE by string.
     73     
     74     :param query: The string to search
     75     :param limit: The number of results to return
     76     '''
     77 	return json.loads(get_url(f'https://services.nvd.nist.gov/rest/json/cves/2.0?keywordSearch={query}&resultsPerPage={limit}'))
     78 
     79 def geoip(ip: str):
     80 	'''
     81 	Get the geolocation of an IP address.
     82 	
     83 	:param ip: The IP address to geolocate
     84 	'''
     85 	return json.loads(get_url('https://api.ipapi.is/?q='+ip))
     86 
     87 def github(option: str, query: str):
     88     '''
     89     Search for a GitHub repository or user.
     90     
     91     :param option: The option to search for (search, repo, user)
     92     :param query: The query to search
     93     '''
     94     header = {'Accept': 'application/vnd.github.v3+json'}
     95     if option == 'search':
     96         url = 'https://api.github.com/search/repositories?q=' + query
     97         data = json.loads(get_url(url, header))  # Changed this line
     98         return data['items'] if data['items'] else False
     99     elif option == 'repo':
    100         url = 'https://api.github.com/repos/' + query
    101         return json.loads(get_url(url, header))  # And this one
    102     elif option == 'user':
    103         url = 'https://api.github.com/users/' + query
    104         return json.loads(get_url(url, header))  # And this one
    105 
    106 def librex(query: str):
    107 	'''
    108 	Search on the SuperNETs running LibreX.
    109 	
    110 	:param query: The query to search
    111 	'''
    112 	return json.loads(get_url(f'https://librex.supernets.org/api.php?q={query}&t=0'))
    113 
    114 def reddit(option, subreddit, id=None):
    115 	'''
    116 	Search for a Reddit post or subreddit.
    117 	
    118 	:param option: The option to search for (post, subreddit)
    119 	:param subreddit: The subreddit to search
    120 	:param id: The post ID to search
    121     '''
    122 	header = {'Accept':'application/json'}
    123 	if option == 'post':
    124 		data = json.loads(get_url('https://reddit.com', f'/r/{subreddit}/comments/{id}.json', header))
    125 		return data[0]['data']['children'][0]['data'] if 'error' not in data else False
    126 	elif option == 'subreddit':
    127 		data = json.loads(get_url('https://reddit.com', f'/r/{subreddit}.json?limit=20', header))
    128 		posts = [item['data'] for item in data['data']['children'] if not item['data']['stickied']]
    129 		return posts if posts else None
    130 	
    131 def exploitdb(query: str):
    132 	'''
    133 	Search for an exploit or shellcode on ExploitDB.
    134 	
    135 	:param query: The query to search
    136 	'''
    137 	exploits = get_url('https://git.supernets.org/mirrors/exploitdb/raw/branch/main/files_exploits.csv')
    138 	shellcodes = get_url('https://git.supernets.org/mirrors/exploitdb/raw/branch/main/files_shellcodes.csv')
    139 	results = []
    140 	for database in (exploits, shellcodes):
    141 		reader = csv.DictReader(io.StringIO(database))
    142 		results += [row for row in reader if query.lower() in row['description'].lower()]
    143 	return results