diff --git a/proxytools/floodbl.py b/proxytools/floodbl.py
@@ -2,35 +2,38 @@
# FloodBL - Developed by acidvegas in Python (https://git.acid.vegas/proxytools)
'''
-This script will test proxies against a set of Domain Name System-based Blackhole Lists (DNSBL) or Real-time Blackhole Lists (RBL)
+
+Test proxies against a set of Domain Name System-based Blackhole Lists (DNSBL) or Real-time Blackhole Lists (RBL)
+
'''
+import asyncio
import argparse
-import concurrent.futures
import os
import re
import socket
-dnsbls = ('dnsbl.dronebl.org','rbl.efnetrbl.org','torexit.dan.me.uk')
-
-def dnsbl_check(proxy):
- global good
- bad = False
- ip = proxy.split(':')[0]
- formatted_ip = '.'.join(ip.split('.')[::-1])
- for dnsbl in dnsbls:
- try:
- socket.gethostbyname(f'{formatted_ip}.{dnsbl}')
- except socket.gaierror:
- pass
- else:
- bad = True
- break
- if bad:
- print('BAD | ' + ip)
- else:
- good.append(proxy)
- print('GOOD | ' + ip)
+blackholes = ('dnsbl.dronebl.org','rbl.efnetrbl.org','torexit.dan.me.uk')
+
+async def check(sema, proxy):
+ async with sema:
+ ip = proxy.split(':')[0]
+ formatted_ip = '.'.join(ip.split('.')[::-1])
+ for dnsbl in blackholes:
+ try:
+ socket.gethostbyname(f'{formatted_ip}.{dnsbl}')
+ print('\033[1;32mGOOD\033[0m \033[30m|\033[0m ' + ip)
+ good.append(proxy)
+ except socket.gaierror:
+ print('\033[1;31mBAD\033[0m \033[30m|\033[0m ' + ip.ljust(22) + f'\033[30m({dnsbl})\033[0m')
+ break
+
+async def main(proxies, threads):
+ sema = asyncio.BoundedSemaphore(threads) # B O U N D E D S E M A P H O R E G A N G
+ jobs = list()
+ for proxy in proxies:
+ jobs.append(asyncio.ensure_future(check(sema, proxy)))
+ await asyncio.gather(*jobs)
# Main
parser = argparse.ArgumentParser(usage='%(prog)s <input> <output> [options]')
@@ -43,14 +46,10 @@ if not os.path.isfile(args.input):
proxies = re.findall('[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+', open(args.input).read(), re.MULTILINE)
if not proxies:
raise SystemExit('no proxies found from input file')
-good = list()
-with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
- checks = {executor.submit(dnsbl_check, proxy): proxy for proxy in proxies}
- for future in concurrent.futures.as_completed(checks):
- checks[future]
+asyncio.run(main(proxies, args.threads))
good.sort()
with open(args.output, 'w') as output_file:
output_file.write('\n'.join(good))
-print('Total : ' + format(len(proxies), ',d'))
-print('Good : ' + format(len(good), ',d'))
-print('Bad : ' + format(len(proxies)-len(good), ',d'))
+print('\n\033[34mTotal\033[0m : ' + format(len(proxies), ',d'))
+print('\033[34mGood\033[0m : ' + format(len(good), ',d'))
+print('\033[34mBad\033[0m : ' + format(len(proxies)-len(good), ',d'))
+\ No newline at end of file
|