proxytools- collection of scripts for harvesting & testing proxies |
git clone git://git.acid.vegas/proxytools.git |
Log | Files | Refs | Archive | README | LICENSE |
tortest.py (2983B)
1 #!/usr/bin/env python 2 # Tor Test - Developed by acidvegas in Python (https://git.acid.vegas/proxytools) 3 4 import io 5 import time 6 7 try: 8 import pycurl 9 except ImportError: 10 raise Exception('missing required library \'pycurl\' (https://pypi.org/project/pycurl/)') 11 12 try: 13 import stem.control 14 except ImportError: 15 raise Exception('missing required library \'stem\' (https://pypi.org/project/stem/)') 16 17 # Globals 18 EXIT_FINGERPRINT = '379FB450010D17078B3766C2273303C358C3A442' # https://metrics.torproject.org/rs.html#details/379FB450010D17078B3766C2273303C358C3A442 19 SOCKS_PORT = 9050 20 CONNECTION_TIMEOUT = 30 # timeout before we give up on a circuit 21 22 def query(url: str): 23 ''' 24 Uses pycurl to fetch a site using the proxy on the SOCKS_PORT. 25 26 :param url: the url to fetch 27 ''' 28 output = io.StringIO.StringIO() 29 query = pycurl.Curl() 30 query.setopt(pycurl.URL, url) 31 query.setopt(pycurl.PROXY, 'localhost') 32 query.setopt(pycurl.PROXYPORT, SOCKS_PORT) 33 query.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5_HOSTNAME) 34 query.setopt(pycurl.CONNECTTIMEOUT, CONNECTION_TIMEOUT) 35 query.setopt(pycurl.WRITEFUNCTION, output.write) 36 try: 37 query.perform() 38 return output.getvalue() 39 except pycurl.error as exc: 40 raise ValueError("Unable to reach %s (%s)" % (url, exc)) 41 42 def scan(controller, path): 43 ''' 44 Test the connection to a website through the given path of relays using the given controller. 45 46 :param controller: the controller to use 47 :param path: a list of fingerprints, in order, to build a path through 48 ''' 49 circuit_id = controller.new_circuit(path, await_build = True) 50 def attach_stream(stream): 51 if stream.status == 'NEW': 52 controller.attach_stream(stream.id, circuit_id) 53 controller.add_event_listener(attach_stream, stem.control.EventType.STREAM) 54 try: 55 controller.set_conf('__LeaveStreamsUnattached', '1') # leave stream management to us 56 start_time = time.time() 57 check_page = query('https://check.torproject.org/') 58 if 'Congratulations. This browser is configured to use Tor.' not in check_page: 59 raise ValueError("Request didn't have the right content") 60 return time.time() - start_time 61 finally: 62 controller.remove_event_listener(attach_stream) 63 controller.reset_conf('__LeaveStreamsUnattached') 64 65 # Main 66 if __name__ == '__main__': 67 with stem.control.Controller.from_port(port=9056) as controller: 68 controller.authenticate('CHANGEME') # Change this to your Tor control password 69 relay_fingerprints = [desc.fingerprint for desc in controller.get_network_statuses()] 70 for fingerprint in relay_fingerprints: 71 try: 72 time_taken = scan(controller, [fingerprint, EXIT_FINGERPRINT]) 73 print('%s => %0.2f seconds' % (fingerprint, time_taken)) 74 except Exception as exc: 75 print('%s => %s' % (fingerprint, exc))