httpz

- Unnamed repository; edit this file 'description' to name the repository.
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

commit 709295bd4a7a53eb137f1f57cf9dc546f94200f6
parent e257a62dc81de4da0ac4d158084f81ec65a7e1da
Author: acidvegas <acid.vegas@acid.vegas>
Date: Sat, 16 Dec 2023 23:26:23 -0500

Updated (THATS RIGHT)

Diffstat:
Mhttpz.py | 81+++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------

1 file changed, 52 insertions(+), 29 deletions(-)

diff --git a/httpz.py b/httpz.py
@@ -27,9 +27,12 @@ except ImportError:
     exit(1)
 
 # ANSI escape codes for colors
+BLUE = '\033[34m'
+CYAN = '\033[36m'
 RED = '\033[91m'
 GREEN = '\033[92m'
 DARK_GREY = '\033[90m'
+YELLOW = '\033[93m'
 RESET = '\033[0m'
 
 # Globals
@@ -46,24 +49,50 @@ def vlog(msg: str):
         logging.info(msg)
 
 
+def create_session(user_agent: str, timeout: int, proxy: str = None) -> dict:
+    '''
+    Create a custom aiohttp session
+
+    :param user_agent: User agent to use for HTTP requests
+    :param timeout: Timeout for HTTP requests
+    '''
+    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+    ssl_context.check_hostname = False
+    ssl_context.verify_mode = ssl.CERT_NONE
+
+    headers = {'User-Agent': user_agent}
+    connector = aiohttp.TCPConnector(ssl=ssl_context)
+
+    session_params = {
+        'connector': connector,
+        'headers': headers,
+        'timeout': aiohttp.ClientTimeout(total=timeout)
+    }
+
+    return session_params
+
+
 def get_dns_servers() -> dict:
     '''Get a list of DNS servers to use for lookups.'''
     with urllib.request.urlopen('https://public-dns.info/nameservers.txt') as source:
         results = source.read().decode().split('\n')
+
     v4_servers = [server for server in results if ':' not in server]
     v6_servers = [server for server in results if ':'     in server]
+
     return {'4': v4_servers, '6': v6_servers}
 
 
-async def dns_lookup(domain: str, record_type: str, timeout: int) -> list:
+async def dns_lookup(domain: str, record_type: str, timeout: int, retry: int) -> list:
     '''
     Resolve DNS information from a domain
 
     :param domain: Domain name to resolve
     :param record_type: DNS record type to resolve
     :param timeout: Timeout for DNS request
+    :param retry: Number of times to retry failed requests
     '''
-    for i in range(args.retry):
+    for i in range(retry):
         try:
             version = '4' if record_type == 'A' else '6' if record_type == 'AAAA' else random.choice(['4','6'])
             nameserver = random.choice(DNS_SERVERS[version])
@@ -82,7 +111,7 @@ async def get_body(source: str, preview: int) -> str:
     :param source: HTML source of the webpage
     :param preview: Number of bytes to preview
     '''
-    body_content = re.search(r'<body.*?>(.*?)</body>', source, re.DOTALL | re.IGNORECASE)
+    body_content = re.search(r'<body.*?>(.*?)</body>', source[:5000], re.DOTALL | re.IGNORECASE)
     processed_content = body_content.group(1) if body_content else source
     clean_content = re.sub(r'<[^>]+>', '', processed_content)
     return clean_content[:preview]
@@ -95,9 +124,9 @@ async def get_title(session: aiohttp.ClientSession, domain: str):
     :param session: aiohttp session
     :param domain: URL to get the title of
     '''
+    title = None
     body = None
     status_code = None
-    title = None
 
     try:
         async with session.get(domain, timeout=args.timeout, allow_redirects=False) as response:
@@ -106,21 +135,24 @@ async def get_title(session: aiohttp.ClientSession, domain: str):
                 html_content = await response.text()
                 match = re.search(r'<title>(.*?)</title>', html_content, re.IGNORECASE | re.DOTALL)
                 title = match.group(1).strip() if match else None
+                title = bytes(title, 'utf-8').decode('unicode_escape') if title else None
                 title = re.sub(r'[\r\n]+', ' ', title)[:300] if title else None # Fix this ugly shit
                 body = await get_body(html_content, args.preview)
+                body = re.sub(r'\s+', ' ', body).strip() if body else None
             elif status_code in (301, 302, 303, 307, 308) and args.retry > 0: # Need to implement a max redirect limit
                 redirect_url = response.headers.get('Location')
                 if redirect_url:
+                    vlog(f'{YELLOW}[WARN]{RESET} {domain} -> {redirect_url} {DARK_GREY}({status_code}){RESET}')
                     return await get_title(session, redirect_url)
                 else:
-                    vlog(f'{RED}[ERROR]{RESET} {domain} - No redirect URL found for {status_code} status code')
+                    vlog(f'{RED}[ERROR]{RESET} No redirect URL found for {domain} {DARK_GREY}({status_code}){RESET}')
             else:
                 vlog(f'{RED}[ERROR]{RESET} {domain} - Invalid status code {DARK_GREY}{status_code}{RESET}')
     except asyncio.TimeoutError:
         vlog(f'{RED}[ERROR]{RESET} {domain} - HTTP request timed out')
     except Exception as e:
         vlog(f'{RED}[ERROR]{RESET} Failed to get title for {domain} {DARK_GREY}({e}){RESET}')
-    return title, body, status_code
+    return title, body, status_code # Fix this ugly shit
 
 
 async def check_url(session: aiohttp.ClientSession, domain: str):
@@ -133,11 +165,11 @@ async def check_url(session: aiohttp.ClientSession, domain: str):
     dns_records = {}
 
     for record_type in ('A', 'AAAA'):
-        records = await dns_lookup(domain, record_type, args.timeout)
+        records = await dns_lookup(domain, record_type, args.timeout, args.retry)
         if records:
             dns_records[record_type] = records
     if not dns_records:
-        cname_record = await dns_lookup(domain, 'CNAME', args.timeout)
+        cname_record = await dns_lookup(domain, 'CNAME', args.timeout, args.retry)
         if cname_record:
             dns_records['CNAME'] = cname_record
             domain = cname_record
@@ -150,7 +182,11 @@ async def check_url(session: aiohttp.ClientSession, domain: str):
         title, body, status_code = await get_title(session, f'http://{domain}')
 
     if title or body:
-        logging.info(f'[{GREEN}SUCCESS{RESET}] {domain} - {title} - {body}')
+        if status_code in (200, 201):
+            status_code = f'[{GREEN}200{RESET}]'
+        elif status_code in (301, 302, 303, 307, 308):
+            status_code = f'[{YELLOW}{status_code}{RESET}]'
+        logging.info(f'{domain} {status_code} [{CYAN}{title}{RESET}] - [{BLUE}{body}{RESET}]')
         return domain, 'https', title, body, dns_records, status_code
     else:
         vlog(f'{RED}[ERROR]{RESET} {domain} - Failed to retrieve title')
@@ -162,20 +198,8 @@ async def process_file():
     '''
     Process a list of domains from file
     '''
-    counter = 0
-    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
-    ssl_context.check_hostname = False
-    ssl_context.verify_mode = ssl.CERT_NONE
-    headers = {'User-Agent': args.user_agent}
-    connector = aiohttp.TCPConnector(ssl=ssl_context)
-
-    session_params = {
-        'connector': connector,
-        'headers': headers,
-        'timeout': aiohttp.ClientTimeout(total=args.timeout)
-    }
-    if args.proxy:
-        session_params['proxy'] = args.proxy
+    
+    session_params = create_session(args.user_agent, args.timeout, args.proxy)
 
     async with aiohttp.ClientSession(**session_params) as session:
         tasks = set()
@@ -192,10 +216,7 @@ async def process_file():
                             domain, protocol, title, body, dns_records, status_code = task.result()
                             if title or body or dns_records:
                                 write_result_to_file(domain, protocol, title, body, dns_records, status_code)
-                                counter += 1
 
-                                if counter % args.memory_limit == 0:
-                                    logging.info(f'Processed {counter} domains')
 
         if tasks:
             done, _ = await asyncio.wait(tasks)
@@ -205,6 +226,8 @@ async def process_file():
                     write_result_to_file(domain, protocol, title, body, dns_records, status_code)
 
 
+
+
 def write_result_to_file(domain, protocol, title, body, dns_records, status_code):
     '''
     Write a single domain result to file
@@ -236,10 +259,10 @@ def main():
     parser.add_argument('-c', '--concurrency', type=int, default=10, help='Number of concurrent requests')
     parser.add_argument('-m', '--memory_limit', type=int, default=1000, help='Number of results to store in memory before syncing to file')
     parser.add_argument('-o', '--output', default='results.json', help='Output file')
-    parser.add_argument('-t', '--timeout', type=int, default=5, help='Timeout for HTTP requests')
+    parser.add_argument('-t', '--timeout', type=int, default=10, help='Timeout for HTTP requests')
     parser.add_argument('-u', '--user_agent', default='Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)', help='User agent to use for HTTP requests')
-    parser.add_argument('-x', '--proxy', help='Proxy to use for HTTP requests')
-    parser.add_argument('-r', '--retry', type=int, default=3, help='Number of times to retry failed requests')
+    parser.add_argument('-x', '--proxy', type=str, help='Proxy to use for HTTP requests')
+    parser.add_argument('-r', '--retry', type=int, default=2, help='Number of times to retry failed requests')
     parser.add_argument('-v', '--verbose', action='store_true', help='Increase output verbosity')
     parser.add_argument('-p', '--preview', type=int, default=500, help='Preview size in bytes for body & title (default: 500)')
     args = parser.parse_args()