tools- collection of tools for supernets sysadmins |
git clone git://git.acid.vegas/tools.git |
Log | Files | Refs | Archive |
commands.py (5078B)
1 import csv 2 import io 3 import json 4 import urllib.request 5 import sys 6 import time 7 8 def download_file(url: str, dest_filename: str, chunk_size: int = 1024*1024): 9 ''' 10 Download a file from a given URL in chunks and save to a destination filename. 11 12 :param url: The URL of the file to download 13 :param dest_filename: The destination filename to save the downloaded file 14 :param chunk_size: Size of chunks to download. Default is set to 1MB. 15 ''' 16 with urllib.request.urlopen(url) as response: 17 total_size = int(response.getheader('Content-Length').strip()) 18 downloaded_size = 0 19 with open(dest_filename, 'wb') as out_file: 20 while True: 21 start_time = time.time() 22 chunk = response.read(chunk_size) 23 if not chunk: 24 break 25 downloaded_size += len(chunk) 26 out_file.write(chunk) 27 end_time = time.time() 28 speed = len(chunk) / (end_time - start_time) 29 progress = (downloaded_size / total_size) * 100 30 sys.stdout.write(f'\rDownloaded {downloaded_size} of {total_size} bytes ({progress:.2f}%) at {speed/1024:.2f} KB/s\r') 31 sys.stdout.flush() 32 print() 33 34 def get_url(url: str, sent_headers: dict = {}, reader: bool = True): 35 ''' 36 Retrieve a URL with custom headers. 37 38 :param url: The URL to retrieve 39 :param data: The headers to send 40 :param reader: Return the reader object instead of the decoded data 41 ''' 42 req = urllib.request.Request(url, headers=sent_headers) 43 if reader: 44 return urllib.request.urlopen(req, timeout=10).read().decode() 45 else: 46 return urllib.request.urlopen(req, timeout=10) 47 48 def setup_user_agent(user_agent: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'): 49 ''' 50 Set up urllib.request user agent. 51 52 :param user_agent: The user agent to use 53 ''' 54 handler = urllib.request.HTTPHandler() 55 opener = urllib.request.build_opener(handler) 56 opener.addheaders = [('User-agent', user_agent)] 57 urllib.request.install_opener(opener) 58 59 60 # -------------------------------------------------------------------------------- # 61 62 def asn_seach(query: str): 63 ''' 64 Search for an ASN by string. 65 66 :param query: The string to search 67 ''' 68 return json.loads(get_url('https://api.bgpview.io/search?query_term='+query)) 69 70 def cve_search(query: str, limit: str = '25'): 71 ''' 72 Search for a CVE by string. 73 74 :param query: The string to search 75 :param limit: The number of results to return 76 ''' 77 return json.loads(get_url(f'https://services.nvd.nist.gov/rest/json/cves/2.0?keywordSearch={query}&resultsPerPage={limit}')) 78 79 def geoip(ip: str): 80 ''' 81 Get the geolocation of an IP address. 82 83 :param ip: The IP address to geolocate 84 ''' 85 return json.loads(get_url('https://api.ipapi.is/?q='+ip)) 86 87 def github(option: str, query: str): 88 ''' 89 Search for a GitHub repository or user. 90 91 :param option: The option to search for (search, repo, user) 92 :param query: The query to search 93 ''' 94 header = {'Accept': 'application/vnd.github.v3+json'} 95 if option == 'search': 96 url = 'https://api.github.com/search/repositories?q=' + query 97 data = json.loads(get_url(url, header)) # Changed this line 98 return data['items'] if data['items'] else False 99 elif option == 'repo': 100 url = 'https://api.github.com/repos/' + query 101 return json.loads(get_url(url, header)) # And this one 102 elif option == 'user': 103 url = 'https://api.github.com/users/' + query 104 return json.loads(get_url(url, header)) # And this one 105 106 def librex(query: str): 107 ''' 108 Search on the SuperNETs running LibreX. 109 110 :param query: The query to search 111 ''' 112 return json.loads(get_url(f'https://librex.supernets.org/api.php?q={query}&t=0')) 113 114 def reddit(option, subreddit, id=None): 115 ''' 116 Search for a Reddit post or subreddit. 117 118 :param option: The option to search for (post, subreddit) 119 :param subreddit: The subreddit to search 120 :param id: The post ID to search 121 ''' 122 header = {'Accept':'application/json'} 123 if option == 'post': 124 data = json.loads(get_url('https://reddit.com', f'/r/{subreddit}/comments/{id}.json', header)) 125 return data[0]['data']['children'][0]['data'] if 'error' not in data else False 126 elif option == 'subreddit': 127 data = json.loads(get_url('https://reddit.com', f'/r/{subreddit}.json?limit=20', header)) 128 posts = [item['data'] for item in data['data']['children'] if not item['data']['stickied']] 129 return posts if posts else None 130 131 def exploitdb(query: str): 132 ''' 133 Search for an exploit or shellcode on ExploitDB. 134 135 :param query: The query to search 136 ''' 137 exploits = get_url('https://git.supernets.org/mirrors/exploitdb/raw/branch/main/files_exploits.csv') 138 shellcodes = get_url('https://git.supernets.org/mirrors/exploitdb/raw/branch/main/files_shellcodes.csv') 139 results = [] 140 for database in (exploits, shellcodes): 141 reader = csv.DictReader(io.StringIO(database)) 142 results += [row for row in reader if query.lower() in row['description'].lower()] 143 return results