proxytools

- collection of scripts for harvesting & testing proxies
git clone git://git.acid.vegas/proxytools.git
Log | Files | Refs | Archive | README | LICENSE

commit cc02b8df44328c6251c8b5333039a9c8b7bff8f8
parent 4d6294a20b310c608602f443c97f0f33fc9358c7
Author: acidvegas <acid.vegas@acid.vegas>
Date: Thu, 12 Oct 2023 15:31:22 -0400

Code cleanup for PIP compliance, also added residential proxy usage examples

Diffstat:
MLICENSE | 2+-
Mcleansocks.py | 2+-
Mfloodbl.py | 7++++++-
Aresidentialproxy.py | 59+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtorglass.py | 34++++++++++++++++++++++------------
Mtortest.py | 36+++++++++++++++++++++++-------------

6 files changed, 112 insertions(+), 28 deletions(-)

diff --git a/LICENSE b/LICENSE
@@ -1,6 +1,6 @@
 ISC License
 
-Copyright (c) 2021, acidvegas <acid.vegas@acid.vegas>
+Copyright (c) 2023, acidvegas <acid.vegas@acid.vegas>
 
 Permission to use, copy, modify, and/or distribute this software for any
 purpose with or without fee is hereby granted, provided that the above
diff --git a/cleansocks.py b/cleansocks.py
@@ -29,7 +29,7 @@ async def check(semaphore, proxy):
 			'family'     : 2
 		}
 		try:
-		    await asyncio.wait_for(aiosocks.open_connection(**options), 15)
+			await asyncio.wait_for(aiosocks.open_connection(**options), 15)
 		except:
 			if print_bad:
 				print('\033[1;31mBAD\033[0m  \033[30m|\033[0m ' + proxy)
diff --git a/floodbl.py b/floodbl.py
@@ -48,7 +48,12 @@ blackholes = {
 	}
 }
 
-def check(proxy):
+def check(proxy: str):
+	'''
+	Check if a proxy is blackholed.
+
+	:param proxy: the proxy to check in the format of ip:port
+	'''
 	proxy_ip     = proxy.split(':')[0]
 	formatted_ip = ipaddress.ip_address(proxy_ip).reverse_pointer
 	for blackhole in blackholes:
diff --git a/residentialproxy.py b/residentialproxy.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+# Residential Proxy Usage Example - Developed by acidvegas in Python (https://git.acid.vegas/proxytools)
+
+'''
+Residential proxies are typically in a user:pass@host:port format, rotating on every request.
+
+These example below show how to use these proxies with the aiosocks library and the requests library.
+'''
+
+import asyncio
+import ssl
+
+try:
+    import aiosocks
+except ImportError:
+    raise SystemExit('missing required library \'aiosocks\' (https://pypi.org/project/aiosocks/)')
+
+try:
+    import requests
+except ImportError:
+    raise SystemExit('missing required library \'requestss\' (https://pypi.org/project/requests/)')
+
+async def tcp_example(proxy: str, host: str, port: int, use_ssl: bool = False):
+    '''
+    Make a connection to a TCP server through a proxy.
+
+    :param proxy: the proxy to use in the format of ip:port
+    :param host: the host to connect to
+    :param port: the port to connect to
+    :param use_ssl: whether or not to use SSL
+    '''
+    auth = proxy.split('@')[0].split(':') if '@' in proxy else None
+    proxy_ip, proxy_port = proxy.split('@')[1].split(':') if '@' in proxy else proxy.split(':')
+    options = {
+        'proxy'      : aiosocks.Socks5Addr(proxy_ip, proxy_port),
+        'proxy_auth' : aiosocks.Socks5Auth(*auth) if auth else None,
+        'dst'        : (host, port),
+        'limit'      : 1024,
+        'ssl'        : ssl._create_unverified_context() if use_ssl else None,
+        'family'     : 2
+    }
+    reader, writer = await asyncio.wait_for(aiosocks.open_connection(**options), 15) # 15 second timeout
+    while True:
+        if reader.at_eof(): # Check if the connection has been closed
+            break
+        data  = await asyncio.wait_for(reader.readuntil(b'\r\n'), 300) # 5 minute timeout on no data received
+        line  = data.decode('utf-8').strip()
+        print(line) # Print the data received from the server
+
+async def http_example(proxy: str, url: str):
+     '''
+     Make a HTTP request through a proxy.
+
+     :param proxy: the proxy to use in the format of ip:port
+     :param url: the url to request
+     '''
+     response = requests.get(url, proxies={'http': proxy, 'https':proxy}, timeout=15) # 15 second timeout
+     return response.text
+\ No newline at end of file
diff --git a/torglass.py b/torglass.py
@@ -1,7 +1,11 @@
 #!/usr/bin/env python
 # Tor Glass - Developed by acidvegas in Python (https://git.acid.vegas/proxytools)
 
-import json
+'''
+A simple script to pull a list of all the Tor relays / exit nodes & generate a json database.
+
+The example below will generate a map of all the Tor relays / exit nodes using the ipinfo.io API.
+'''
 
 try:
 	import stem.descriptor.remote
@@ -52,25 +56,31 @@ def get_descriptors() -> dict:
 			tor_map['relay'].append(data)
 	return tor_map
 
+
 if __name__ == '__main__':
+	import json
+
 	print('loading Tor descriptors... (this could take a while)')
 	tor_data = get_descriptors()
+	
 	with open('tor.json', 'w') as fd:
 		json.dump(tor_data['relay'], fd)
 	with open('tor.exit.json', 'w') as fd:
 		json.dump(tor_data['exit'], fd)
+
 	print('Relays: {0:,}'.foramt(len(tor_data['relay'])))
 	print('Exits : {0:,}'.format(len(tor_data['exit'])))
+
 	try:
 		import ipinfo
 	except ImportError:
-		print('missing optional library \'ipinfo\' (https://pypi.org/project/ipinfo/) for map visualization')
-	else:
-		try:
-			handler = ipinfo.getHandler('changeme') # put your ipinfo.io API key here
-			print('Relay Map: ' + handler.getMap([ip['address'] for ip in tor_data['relay']]))
-			print('Exit  Map: ' + handler.getMap([ip['address'] for ip in tor_data['exit']]))
-		except ipinfo.errors.AuthorizationError:
-			print('error: invalid ipinfo.io API key (https://ipinfo.io/signup)')
-		except Exception as ex:
-			print(f'error generating ipinfo map ({ex})')
-\ No newline at end of file
+		raise ImportError('missing optional library \'ipinfo\' (https://pypi.org/project/ipinfo/) for map visualization')
+	
+	try:
+		handler = ipinfo.getHandler('changeme') # put your ipinfo.io API key here
+		print('Relay Map: ' + handler.getMap([ip['address'] for ip in tor_data['relay']]))
+		print('Exit  Map: ' + handler.getMap([ip['address'] for ip in tor_data['exit']]))
+	except ipinfo.errors.AuthorizationError:
+		print('error: invalid ipinfo.io API key (https://ipinfo.io/signup)')
+	except Exception as ex:
+		print(f'error generating ipinfo map ({ex})')
+\ No newline at end of file
diff --git a/tortest.py b/tortest.py
@@ -19,8 +19,12 @@ EXIT_FINGERPRINT = '379FB450010D17078B3766C2273303C358C3A442' # https://metrics.
 SOCKS_PORT = 9050
 CONNECTION_TIMEOUT = 30  # timeout before we give up on a circuit
 
-def query(url):
-    ''' Uses pycurl to fetch a site using the proxy on the SOCKS_PORT. '''
+def query(url: str):
+    '''
+    Uses pycurl to fetch a site using the proxy on the SOCKS_PORT.
+    
+    :param url: the url to fetch
+    '''
     output = io.StringIO.StringIO()
     query = pycurl.Curl()
     query.setopt(pycurl.URL, url)
@@ -36,7 +40,12 @@ def query(url):
         raise ValueError("Unable to reach %s (%s)" % (url, exc))
 
 def scan(controller, path):
-    ''' Test the connection to a website through the given path of relays using the given controller '''
+    '''
+    Test the connection to a website through the given path of relays using the given controller.
+    
+    :param controller: the controller to use
+    :param path: a list of fingerprints, in order, to build a path through
+    '''
     circuit_id = controller.new_circuit(path, await_build = True)
     def attach_stream(stream):
         if stream.status == 'NEW':
@@ -54,12 +63,13 @@ def scan(controller, path):
         controller.reset_conf('__LeaveStreamsUnattached')
 
 # Main
-with stem.control.Controller.from_port(port=9056) as controller:
-    controller.authenticate('loldongs')
-    relay_fingerprints = [desc.fingerprint for desc in controller.get_network_statuses()]
-    for fingerprint in relay_fingerprints:
-        try:
-            time_taken = scan(controller, [fingerprint, EXIT_FINGERPRINT])
-            print('%s => %0.2f seconds' % (fingerprint, time_taken))
-        except Exception as exc:
-            print('%s => %s' % (fingerprint, exc))
-\ No newline at end of file
+if __name__ == '__main__':
+    with stem.control.Controller.from_port(port=9056) as controller:
+        controller.authenticate('CHANGEME') # Change this to your Tor control password
+        relay_fingerprints = [desc.fingerprint for desc in controller.get_network_statuses()]
+        for fingerprint in relay_fingerprints:
+            try:
+                time_taken = scan(controller, [fingerprint, EXIT_FINGERPRINT])
+                print('%s => %0.2f seconds' % (fingerprint, time_taken))
+            except Exception as exc:
+                print('%s => %s' % (fingerprint, exc))
+\ No newline at end of file