scroll- irc bot to play ascii art |
git clone git://git.acid.vegas/scroll.git |
Log | Files | Refs | Archive | README | LICENSE |
scroll.py (14638B)
1 #!/usr/bin/env python 2 # Scroll IRC Art Bot - Developed by acidvegas in Python (https://git.acid.vegas/scroll) 3 4 import asyncio 5 import random 6 import re 7 import ssl 8 import time 9 import urllib.request 10 import aiohttp 11 12 class connection: 13 server = 'irc.server.com' 14 port = 6697 15 ipv6 = False 16 ssl = True 17 vhost = None # Must in ('ip', port) format 18 channel = '#chats' 19 key = None 20 modes = 'BdDg' 21 22 class identity: 23 nickname = 'scroll' 24 username = 'scroll' 25 realname = 'git.acid.vegas/scroll' 26 nickserv = None 27 28 # Settings 29 admin = 'acidvegas!*@*' # Can use wildcards (Must be in nick!user@host format) 30 31 # Formatting Control Characters / Color Codes 32 bold = '\x02' 33 italic = '\x1D' 34 underline = '\x1F' 35 reverse = '\x16' 36 reset = '\x0f' 37 white = '00' 38 black = '01' 39 blue = '02' 40 green = '03' 41 red = '04' 42 brown = '05' 43 purple = '06' 44 orange = '07' 45 yellow = '08' 46 light_green = '09' 47 cyan = '10' 48 light_cyan = '11' 49 light_blue = '12' 50 pink = '13' 51 grey = '14' 52 light_grey = '15' 53 54 def color(msg, foreground, background=None): 55 return f'\x03{foreground},{background}{msg}{reset}' if background else f'\x03{foreground}{msg}{reset}' 56 57 def debug(data): 58 print('{0} | [~] - {1}'.format(time.strftime('%I:%M:%S'), data)) 59 60 def error(data, reason=None): 61 print('{0} | [!] - {1} ({2})'.format(time.strftime('%I:%M:%S'), data, str(reason))) if reason else print('{0} | [!] - {1}'.format(time.strftime('%I:%M:%S'), data)) 62 63 def get_url(url, git=False): 64 data = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'} 65 if git: 66 data['Accept'] = 'application/vnd.github.v3+json' 67 req = urllib.request.Request(url, headers=data) 68 return urllib.request.urlopen(req, timeout=10) 69 70 def is_admin(ident): 71 return re.compile(admin.replace('*','.*')).search(ident) 72 73 def ssl_ctx(): 74 ctx = ssl.create_default_context() 75 ctx.check_hostname = False 76 ctx.verify_mode = ssl.CERT_NONE 77 return ctx 78 79 class Bot(): 80 def __init__(self): 81 self.db = None 82 self.last = time.time() 83 self.loops = dict() 84 self.host = '' 85 self.playing = False 86 self.settings = { 87 'flood' : 1, 88 'ignore' : 'big,birds,doc,gorf,hang,nazi,pokemon', 89 'lines' : 500, 90 'msg' : 0.03, 91 'paste' : True, 92 'png_palette' : 'RGB99', 93 'png_quantize' : 99, 94 'png_width' : 80, 95 'results' : 25} 96 self.slow = False 97 self.reader = None 98 self.writer = None 99 100 async def raw(self, data): 101 self.writer.write(data[:510].encode('utf-8') + b'\r\n') 102 await self.writer.drain() 103 104 async def action(self, chan, msg): 105 await self.sendmsg(chan, f'\x01ACTION {msg}\x01') 106 107 async def sendmsg(self, target, msg): 108 await self.raw(f'PRIVMSG {target} :{msg}') 109 110 async def irc_error(self, chan, msg, reason=None): 111 await self.sendmsg(chan, '[{0}] {1} {2}'.format(color('ERROR', red), msg, color(f'({reason})', grey))) if reason else await self.sendmsg(chan, '[{0}] {1}'.format(color('ERROR', red), msg)) 112 113 async def connect(self): 114 while True: 115 try: 116 options = { 117 'host' : connection.server, 118 'port' : connection.port, 119 'limit' : 1024, 120 'ssl' : ssl_ctx() if connection.ssl else None, 121 'family' : 10 if connection.ipv6 else 2, 122 'local_addr' : connection.vhost 123 } 124 self.reader, self.writer = await asyncio.wait_for(asyncio.open_connection(**options), 15) 125 await self.raw(f'USER {identity.username} 0 * :{identity.realname}') 126 await self.raw('NICK ' + identity.nickname) 127 except Exception as ex: 128 error('failed to connect to ' + connection.server, ex) 129 else: 130 await self.listen() 131 finally: 132 for item in self.loops: 133 if self.loops[item]: 134 self.loops[item].cancel() 135 self.loops = dict() 136 self.playing = False 137 self.slow = False 138 await asyncio.sleep(30) 139 140 async def sync(self): 141 self.db = {'root': []} 142 page = 1 143 per_page = 1000 # Gitea's default per_page limit 144 145 while True: 146 try: 147 async with aiohttp.ClientSession() as session: 148 async with session.get(f'https://git.supernets.org/api/v1/repos/ircart/ircart/git/trees/master?recursive=1&page={page}&per_page={per_page}') as resp: 149 if resp.status != 200: 150 error('failed to sync database', await resp.text()) 151 return 152 files = await resp.json() 153 154 # Process files from this page 155 for file in files['tree']: 156 if file['path'].startswith('ircart/') and file['path'].endswith('.txt') and not file['path'].startswith('ircart/.'): 157 name = file['path'][7:-4] # Just strip 'ircart/' prefix and '.txt' suffix 158 if '/' in name: 159 dir, fname = name.split('/', 1) 160 self.db[dir] = self.db[dir]+[fname,] if dir in self.db else [fname,] 161 else: 162 self.db['root'].append(name) 163 164 # Check if we've processed all pages 165 if not files.get('truncated', False): 166 break 167 168 page += 1 169 170 except Exception as ex: 171 error('failed to sync database', ex) 172 return 173 174 async def play(self, chan, name, img=False, paste=False): 175 try: 176 if img or paste: 177 ascii = get_url(name) 178 else: 179 ascii = get_url(f'https://git.supernets.org/ircart/ircart/raw/branch/master/ircart/{name}.txt') 180 if ascii.getcode() == 200: 181 if img: 182 ascii = img2irc.convert(ascii.read(), img, int(self.settings['png_width']), self.settings['png_palette'], int(self.settings['png_quantize_colors'])) 183 else: 184 ascii = ascii.readlines() 185 if len(ascii) > int(self.settings['lines']) and chan != '#scroll': 186 await self.irc_error(chan, 'file is too big', f'take those {len(ascii):,} lines to #scroll') 187 else: 188 if not img and not paste: 189 await self.action(chan, 'the ascii gods have chosen... ' + color(name, cyan)) 190 for line in ascii: 191 if type(line) == bytes: 192 try: 193 line = line.decode() 194 except UnicodeError: 195 line = line.decode(chardet.detect(line)['encoding']).encode().decode() # TODO: Do we need to re-encode/decode in UTF-8? 196 line = line.replace('\n','').replace('\r','') 197 await self.sendmsg(chan, line + reset) 198 await asyncio.sleep(self.settings['msg']) 199 else: 200 await self.irc_error(chan, 'invalid name', name) if not img and not paste else await self.irc_error(chan, 'invalid url', name) 201 except Exception as ex: 202 try: 203 await self.irc_error(chan, 'error in play function', ex) 204 except: 205 error('error in play function', ex) 206 finally: 207 self.playing = False 208 209 async def listen(self): 210 while True: 211 try: 212 if self.reader.at_eof(): 213 break 214 data = await asyncio.wait_for(self.reader.readuntil(b'\r\n'), 600) 215 line = data.decode('utf-8').strip() 216 args = line.split() 217 debug(line) 218 if line.startswith('ERROR :Closing Link:'): 219 raise Exception('Connection has closed.') 220 elif args[0] == 'PING': 221 await self.raw('PONG '+args[1][1:]) 222 elif args[1] == '001': 223 if connection.modes: 224 await self.raw(f'MODE {identity.nickname} +{connection.modes}') 225 if identity.nickserv: 226 await self.sendmsg('NickServ', f'IDENTIFY {identity.nickname} {identity.nickserv}') 227 await self.raw(f'JOIN {connection.channel} {connection.key}') if connection.key else await self.raw('JOIN ' + connection.channel) 228 await self.raw('JOIN #scroll') 229 await self.sync() 230 elif args[1] == '311' and len(args) >= 6: # RPL_WHOISUSER 231 nick = args[2] 232 host = args[5] 233 if nick == identity.nickname: 234 self.host = host 235 elif args[1] == '433': 236 error('The bot is already running or nick is in use.') 237 elif args[1] == 'INVITE' and len(args) == 4: 238 invited = args[2] 239 chan = args[3][1:] 240 if invited == identity.nickname and chan in (connection.channel, '#scroll'): 241 await self.raw(f'JOIN {connection.channel} {connection.key}') if connection.key else await self.raw('JOIN ' + connection.channel) 242 elif args[1] == 'JOIN' and len(args) >= 3: 243 nick = args[0].split('!')[0][1:] 244 host = args[0].split('@')[1] 245 if nick == identity.nickname: 246 self.host = host 247 elif args[1] == 'KICK' and len(args) >= 4: 248 chan = args[2] 249 kicked = args[3] 250 if kicked == identity.nickname and chan in (connection.channel,'#scroll'): 251 await asyncio.sleep(3) 252 await self.raw(f'JOIN {connection.channel} {connection.key}') if connection.key else await self.raw('JOIN ' + connection.channel) 253 elif args[1] == 'PRIVMSG' and len(args) >= 4: 254 ident = args[0][1:] 255 nick = args[0].split('!')[0][1:] 256 chan = args[2] 257 msg = ' '.join(args[3:])[1:] 258 if chan in (connection.channel, '#scroll'): 259 args = msg.split() 260 if msg == '@scroll': 261 await self.sendmsg(chan, bold + 'Scroll IRC Art Bot - Developed by acidvegas in Python - https://git.acid.vegas/scroll') 262 elif args[0] == '.ascii': 263 if msg == '.ascii stop': 264 if self.playing: 265 if chan in self.loops: 266 self.loops[chan].cancel() 267 elif time.time() - self.last < self.settings['flood']: 268 if not self.slow: 269 if not self.playing: 270 await self.irc_error(chan, 'slow down nerd') 271 self.slow = True 272 elif len(args) >= 2 and not self.playing: 273 self.slow = False 274 if msg == '.ascii dirs': 275 for dir in self.db: 276 await self.sendmsg(chan, '[{0}] {1}{2}'.format(color(str(list(self.db).index(dir)+1).zfill(2), pink), dir.ljust(10), color('('+str(len(self.db[dir]))+')', grey))) 277 await asyncio.sleep(self.settings['msg']) 278 elif args[1] == 'img' and len(args) == 3: 279 url = args[2] 280 if url.startswith('https://') or url.startswith('http://'): 281 self.playing = True 282 width = 512 - len(line.split(' :')[0])+4 283 self.loops[chan] = asyncio.create_task(self.play(chan, url, img=width)) 284 elif msg == '.ascii list': 285 await self.sendmsg(chan, underline + color('https://git.supernets.org/ircart/ircart/src/branch/master/ircart/.list', light_blue)) 286 elif args[1] == 'random' and len(args) in (2,3): 287 if len(args) == 3: 288 query = args[2] 289 else: 290 query = random.choice([item for item in self.db if item not in self.settings['ignore']]) 291 if query in self.db: 292 ascii = f'{query}/{random.choice(self.db[query])}' 293 self.playing = True 294 self.loops[chan] = asyncio.create_task(self.play(chan, ascii)) 295 else: 296 results = [{'name':ascii,'dir':dir} for dir in self.db for ascii in self.db[dir] if query in ascii] 297 if results: 298 ascii = random.choice(results) 299 ascii = f'{ascii["dir"]}/{ascii["name"]}' 300 self.playing = True 301 self.loops[chan] = asyncio.create_task(self.play(chan, ascii)) 302 else: 303 await self.irc_error(chan, 'invalid directory name or search query', query) 304 elif msg == '.ascii sync' and is_admin(ident): 305 await self.sync() 306 await self.sendmsg(chan, bold + color('database synced', light_green)) 307 elif args[1] == 'play' and len(args) == 3 and self.settings['paste']: 308 url = args[2] 309 if url.startswith('https://pastebin.com/raw/') and len(url.split('raw/')) > 1: 310 self.loops[chan] = asyncio.create_task(self.play(chan, url, paste=True)) 311 else: 312 await self.irc_error(chan, 'invalid pastebin url', url) 313 elif args[1] == 'search' and len(args) == 3: 314 query = args[2] 315 results = [{'name':ascii,'dir':dir} for dir in self.db for ascii in self.db[dir] if query in ascii] 316 if results: 317 for item in results[:int(self.settings['results'])]: 318 if item['dir'] == 'root': 319 await self.sendmsg(chan, '[{0}] {1}'.format(color(str(results.index(item)+1).zfill(2), pink), item['name'])) 320 else: 321 await self.sendmsg(chan, '[{0}] {1} {2}'.format(color(str(results.index(item)+1).zfill(2), pink), item['name'], color('('+item['dir']+')', grey))) 322 await asyncio.sleep(self.settings['msg']) 323 else: 324 await self.irc_error(chan, 'no results found', query) 325 elif args[1] == 'settings': 326 if len(args) == 2: 327 for item in self.settings: 328 await self.sendmsg(chan, color(item.ljust(13), yellow) + color(str(self.settings[item]), grey)) 329 elif len(args) == 4 and is_admin(ident): 330 setting = args[2] 331 option = args[3] 332 if setting in self.settings: 333 if setting in ('flood','lines','msg','png_quantize','png_width','results'): 334 try: 335 option = float(option) 336 self.settings[setting] = option 337 await self.sendmsg(chan, color('OK', light_green)) 338 except ValueError: 339 await self.irc_error(chan, 'invalid option', 'must be a float or int') 340 elif setting == 'paste': 341 if option == 'on': 342 self.settings[setting] = True 343 await self.sendmsg(chan, color('OK', light_green)) 344 elif option == 'off': 345 self.settings[setting] = False 346 await self.sendmsg(chan, color('OK', light_green)) 347 else: 348 await self.irc_error(chan, 'invalid option', 'must be on or off') 349 else: 350 await self.irc_error(chan, 'invalid setting', setting) 351 elif len(args) == 2: 352 query = args[1] 353 results = [dir+'/'+ascii for dir in self.db for ascii in self.db[dir] if query == ascii] 354 if results: 355 results = results[0].replace('root/','') 356 self.playing = True 357 self.loops[chan] = asyncio.create_task(self.play(chan, results)) 358 else: 359 await self.irc_error(chan, 'no results found', query) 360 except (UnicodeDecodeError, UnicodeEncodeError): 361 pass 362 except Exception as ex: 363 error('fatal error occured', ex) 364 break 365 finally: 366 self.last = time.time() 367 368 # Main 369 print('#'*56) 370 print('#{:^54}#'.format('')) 371 print('#{:^54}#'.format('Scroll IRC Art Bot')) 372 print('#{:^54}#'.format('Developed by acidvegas in Python')) 373 print('#{:^54}#'.format('https://git.acid.vegas/scroll')) 374 print('#{:^54}#'.format('')) 375 print('#'*56) 376 try: 377 import chardet 378 except ImportError: 379 raise SystemExit('missing required \'chardet\' library (https://pypi.org/project/chardet/)') 380 try: 381 import img2irc 382 except ImportError: 383 raise SystemExit('missing required \'img2irc\' file (https://github.com/ircart/scroll/blob/master/img2irc.py)') 384 asyncio.run(Bot().connect())