scroll- irc bot to play ascii art |
git clone git://git.acid.vegas/scroll.git |
Log | Files | Refs | Archive | README | LICENSE |
scroll.py (14416B)
1 #!/usr/bin/env python 2 # Scroll IRC Art Bot - Developed by acidvegas in Python (https://git.acid.vegas/scroll) 3 4 import asyncio 5 import json 6 import random 7 import re 8 import ssl 9 import sys 10 import time 11 import urllib.request 12 13 class connection: 14 server = 'irc.server.com' 15 port = 6697 16 ipv6 = False 17 ssl = True 18 vhost = None # Must in ('ip', port) format 19 channel = '#chats' 20 key = None 21 modes = 'BdDg' 22 23 class identity: 24 nickname = 'scroll' 25 username = 'scroll' 26 realname = 'git.acid.vegas/scroll' 27 nickserv = None 28 29 # Settings 30 admin = 'acidvegas!*@*' # Can use wildcards (Must be in nick!user@host format) 31 32 # Formatting Control Characters / Color Codes 33 bold = '\x02' 34 italic = '\x1D' 35 underline = '\x1F' 36 reverse = '\x16' 37 reset = '\x0f' 38 white = '00' 39 black = '01' 40 blue = '02' 41 green = '03' 42 red = '04' 43 brown = '05' 44 purple = '06' 45 orange = '07' 46 yellow = '08' 47 light_green = '09' 48 cyan = '10' 49 light_cyan = '11' 50 light_blue = '12' 51 pink = '13' 52 grey = '14' 53 light_grey = '15' 54 55 def color(msg, foreground, background=None): 56 return f'\x03{foreground},{background}{msg}{reset}' if background else f'\x03{foreground}{msg}{reset}' 57 58 def debug(data): 59 print('{0} | [~] - {1}'.format(time.strftime('%I:%M:%S'), data)) 60 61 def error(data, reason=None): 62 print('{0} | [!] - {1} ({2})'.format(time.strftime('%I:%M:%S'), data, str(reason))) if reason else print('{0} | [!] - {1}'.format(time.strftime('%I:%M:%S'), data)) 63 64 def get_url(url, git=False): 65 data = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'} 66 if git: 67 data['Accept'] = 'application/vnd.github.v3+json' 68 req = urllib.request.Request(url, headers=data) 69 return urllib.request.urlopen(req, timeout=10) 70 71 def is_admin(ident): 72 return re.compile(admin.replace('*','.*')).search(ident) 73 74 def ssl_ctx(): 75 ctx = ssl.create_default_context() 76 ctx.check_hostname = False 77 ctx.verify_mode = ssl.CERT_NONE 78 return ctx 79 80 class Bot(): 81 def __init__(self): 82 self.db = None 83 self.last = time.time() 84 self.loops = dict() 85 self.host = '' 86 self.playing = False 87 self.settings = { 88 'flood' : 1, 89 'ignore' : 'big,birds,doc,gorf,hang,nazi,pokemon', 90 'lines' : 500, 91 'msg' : 0.03, 92 'paste' : True, 93 'png_palette' : 'RGB99', 94 'png_quantize' : 99, 95 'png_width' : 80, 96 'results' : 25} 97 self.slow = False 98 self.reader = None 99 self.writer = None 100 101 async def raw(self, data): 102 self.writer.write(data[:510].encode('utf-8') + b'\r\n') 103 await self.writer.drain() 104 105 async def action(self, chan, msg): 106 await self.sendmsg(chan, f'\x01ACTION {msg}\x01') 107 108 async def sendmsg(self, target, msg): 109 await self.raw(f'PRIVMSG {target} :{msg}') 110 111 async def irc_error(self, chan, msg, reason=None): 112 await self.sendmsg(chan, '[{0}] {1} {2}'.format(color('ERROR', red), msg, color(f'({reason})', grey))) if reason else await self.sendmsg(chan, '[{0}] {1}'.format(color('ERROR', red), msg)) 113 114 async def connect(self): 115 while True: 116 try: 117 options = { 118 'host' : connection.server, 119 'port' : connection.port, 120 'limit' : 1024, 121 'ssl' : ssl_ctx() if connection.ssl else None, 122 'family' : 10 if connection.ipv6 else 2, 123 'local_addr' : connection.vhost 124 } 125 self.reader, self.writer = await asyncio.wait_for(asyncio.open_connection(**options), 15) 126 await self.raw(f'USER {identity.username} 0 * :{identity.realname}') 127 await self.raw('NICK ' + identity.nickname) 128 except Exception as ex: 129 error('failed to connect to ' + connection.server, ex) 130 else: 131 await self.listen() 132 finally: 133 for item in self.loops: 134 if self.loops[item]: 135 self.loops[item].cancel() 136 self.loops = dict() 137 self.playing = False 138 self.slow = False 139 await asyncio.sleep(30) 140 141 async def sync(self): 142 cache = self.db 143 self.db = {'root':list()} 144 try: 145 sha = [item['sha'] for item in json.loads(get_url('https://api.github.com/repos/ircart/ircart/contents', True).read().decode('utf-8')) if item['path'] == 'ircart'][0] 146 files = json.loads(get_url(f'https://api.github.com/repos/ircart/ircart/git/trees/{sha}?recursive=true', True).read().decode('utf-8'))['tree'] 147 for file in files: 148 if file['type'] != 'tree': 149 file['path'] = file['path'][:-4] 150 if '/' in file['path']: 151 dir = file['path'].split('/')[0] 152 name = file['path'].split('/')[1] 153 self.db[dir] = self.db[dir]+[name,] if dir in self.db else [name,] 154 else: 155 self.db['root'].append(file['path']) 156 except Exception as ex: 157 try: 158 await self.irc_error(connection.channel, 'failed to sync database', ex) 159 except: 160 error(connection.channel, 'failed to sync database', ex) 161 finally: 162 self.db = cache 163 164 async def play(self, chan, name, img=False, paste=False): 165 try: 166 if img or paste: 167 ascii = get_url(name) 168 else: 169 ascii = get_url(f'https://raw.githubusercontent.com/ircart/ircart/master/ircart/{name}.txt') 170 if ascii.getcode() == 200: 171 if img: 172 ascii = img2irc.convert(ascii.read(), img, int(self.settings['png_width']), self.settings['png_palette'], int(self.settings['png_quantize_colors'])) 173 else: 174 ascii = ascii.readlines() 175 if len(ascii) > int(self.settings['lines']) and chan != '#scroll': 176 await self.irc_error(chan, 'file is too big', f'take those {len(ascii):,} lines to #scroll') 177 else: 178 if not img and not paste: 179 await self.action(chan, 'the ascii gods have chosen... ' + color(name, cyan)) 180 for line in ascii: 181 if type(line) == bytes: 182 try: 183 line = line.decode() 184 except UnicodeError: 185 line = line.decode(chardet.detect(line)['encoding']).encode().decode() # TODO: Do we need to re-encode/decode in UTF-8? 186 line = line.replace('\n','').replace('\r','') 187 await self.sendmsg(chan, line + reset) 188 await asyncio.sleep(self.settings['msg']) 189 else: 190 await self.irc_error(chan, 'invalid name', name) if not img and not paste else await self.irc_error(chan, 'invalid url', name) 191 except Exception as ex: 192 try: 193 await self.irc_error(chan, 'error in play function', ex) 194 except: 195 error('error in play function', ex) 196 finally: 197 self.playing = False 198 199 async def listen(self): 200 while True: 201 try: 202 if self.reader.at_eof(): 203 break 204 data = await asyncio.wait_for(self.reader.readuntil(b'\r\n'), 600) 205 line = data.decode('utf-8').strip() 206 args = line.split() 207 debug(line) 208 if line.startswith('ERROR :Closing Link:'): 209 raise Exception('Connection has closed.') 210 elif args[0] == 'PING': 211 await self.raw('PONG '+args[1][1:]) 212 elif args[1] == '001': 213 if connection.modes: 214 await self.raw(f'MODE {identity.nickname} +{connection.modes}') 215 if identity.nickserv: 216 await self.sendmsg('NickServ', f'IDENTIFY {identity.nickname} {identity.nickserv}') 217 await self.raw(f'JOIN {connection.channel} {connection.key}') if connection.key else await self.raw('JOIN ' + connection.channel) 218 await self.raw('JOIN #scroll') 219 await self.sync() 220 elif args[1] == '311' and len(args) >= 6: # RPL_WHOISUSER 221 nick = args[2] 222 host = args[5] 223 if nick == identity.nickname: 224 self.host = host 225 elif args[1] == '433': 226 error('The bot is already running or nick is in use.') 227 elif args[1] == 'INVITE' and len(args) == 4: 228 invited = args[2] 229 chan = args[3][1:] 230 if invited == identity.nickname and chan in (connection.channel, '#scroll'): 231 await self.raw(f'JOIN {connection.channel} {connection.key}') if connection.key else await self.raw('JOIN ' + connection.channel) 232 elif args[1] == 'JOIN' and len(args) >= 3: 233 nick = args[0].split('!')[0][1:] 234 host = args[0].split('@')[1] 235 if nick == identity.nickname: 236 self.host = host 237 elif args[1] == 'KICK' and len(args) >= 4: 238 chan = args[2] 239 kicked = args[3] 240 if kicked == identity.nickname and chan in (connection.channel,'#scroll'): 241 await asyncio.sleep(3) 242 await self.raw(f'JOIN {connection.channel} {connection.key}') if connection.key else await self.raw('JOIN ' + connection.channel) 243 elif args[1] == 'PRIVMSG' and len(args) >= 4: 244 ident = args[0][1:] 245 nick = args[0].split('!')[0][1:] 246 chan = args[2] 247 msg = ' '.join(args[3:])[1:] 248 if chan in (connection.channel, '#scroll'): 249 args = msg.split() 250 if msg == '@scroll': 251 await self.sendmsg(chan, bold + 'Scroll IRC Art Bot - Developed by acidvegas in Python - https://git.acid.vegas/scroll') 252 elif args[0] == '.ascii': 253 if msg == '.ascii stop': 254 if self.playing: 255 if chan in self.loops: 256 self.loops[chan].cancel() 257 elif time.time() - self.last < self.settings['flood']: 258 if not self.slow: 259 if not self.playing: 260 await self.irc_error(chan, 'slow down nerd') 261 self.slow = True 262 elif len(args) >= 2 and not self.playing: 263 self.slow = False 264 if msg == '.ascii dirs': 265 for dir in self.db: 266 await self.sendmsg(chan, '[{0}] {1}{2}'.format(color(str(list(self.db).index(dir)+1).zfill(2), pink), dir.ljust(10), color('('+str(len(self.db[dir]))+')', grey))) 267 await asyncio.sleep(self.settings['msg']) 268 elif args[1] == 'img' and len(args) == 3: 269 url = args[2] 270 if url.startswith('https://') or url.startswith('http://'): 271 self.playing = True 272 width = 512 - len(line.split(' :')[0])+4 273 self.loops[chan] = asyncio.create_task(self.play(chan, url, img=width)) 274 elif msg == '.ascii list': 275 await self.sendmsg(chan, underline + color('https://raw.githubusercontent.com/ircart/ircart/master/ircart/.list', light_blue)) 276 elif args[1] == 'random' and len(args) in (2,3): 277 if len(args) == 3: 278 query = args[2] 279 else: 280 query = random.choice([item for item in self.db if item not in self.settings['ignore']]) 281 if query in self.db: 282 ascii = f'{query}/{random.choice(self.db[query])}' 283 self.playing = True 284 self.loops[chan] = asyncio.create_task(self.play(chan, ascii)) 285 else: 286 results = [{'name':ascii,'dir':dir} for dir in self.db for ascii in self.db[dir] if query in ascii] 287 if results: 288 ascii = random.choice(results) 289 ascii = f'{ascii["dir"]}/{ascii["name"]}' 290 self.playing = True 291 self.loops[chan] = asyncio.create_task(self.play(chan, ascii)) 292 else: 293 await self.irc_error(chan, 'invalid directory name or search query', query) 294 elif msg == '.ascii sync' and is_admin(ident): 295 await self.sync() 296 await self.sendmsg(chan, bold + color('database synced', light_green)) 297 elif args[1] == 'play' and len(args) == 3 and self.settings['paste']: 298 url = args[2] 299 if url.startswith('https://pastebin.com/raw/') and len(url.split('raw/')) > 1: 300 self.loops[chan] = asyncio.create_task(self.play(chan, url, paste=True)) 301 else: 302 await self.irc_error(chan, 'invalid pastebin url', url) 303 elif args[1] == 'search' and len(args) == 3: 304 query = args[2] 305 results = [{'name':ascii,'dir':dir} for dir in self.db for ascii in self.db[dir] if query in ascii] 306 if results: 307 for item in results[:int(self.settings['results'])]: 308 if item['dir'] == 'root': 309 await self.sendmsg(chan, '[{0}] {1}'.format(color(str(results.index(item)+1).zfill(2), pink), item['name'])) 310 else: 311 await self.sendmsg(chan, '[{0}] {1} {2}'.format(color(str(results.index(item)+1).zfill(2), pink), item['name'], color('('+item['dir']+')', grey))) 312 await asyncio.sleep(self.settings['msg']) 313 else: 314 await self.irc_error(chan, 'no results found', query) 315 elif args[1] == 'settings': 316 if len(args) == 2: 317 for item in self.settings: 318 await self.sendmsg(chan, color(item.ljust(13), yellow) + color(str(self.settings[item]), grey)) 319 elif len(args) == 4 and is_admin(ident): 320 setting = args[2] 321 option = args[3] 322 if setting in self.settings: 323 if setting in ('flood','lines','msg','png_quantize','png_width','results'): 324 try: 325 option = float(option) 326 self.settings[setting] = option 327 await self.sendmsg(chan, color('OK', light_green)) 328 except ValueError: 329 await self.irc_error(chan, 'invalid option', 'must be a float or int') 330 elif setting == 'paste': 331 if option == 'on': 332 self.settings[setting] = True 333 await self.sendmsg(chan, color('OK', light_green)) 334 elif option == 'off': 335 self.settings[setting] = False 336 await self.sendmsg(chan, color('OK', light_green)) 337 else: 338 await self.irc_error(chan, 'invalid option', 'must be on or off') 339 else: 340 await self.irc_error(chan, 'invalid setting', setting) 341 elif len(args) == 2: 342 query = args[1] 343 results = [dir+'/'+ascii for dir in self.db for ascii in self.db[dir] if query == ascii] 344 if results: 345 results = results[0].replace('root/','') 346 self.playing = True 347 self.loops[chan] = asyncio.create_task(self.play(chan, results)) 348 else: 349 await self.irc_error(chan, 'no results found', query) 350 except (UnicodeDecodeError, UnicodeEncodeError): 351 pass 352 except Exception as ex: 353 error('fatal error occured', ex) 354 break 355 finally: 356 self.last = time.time() 357 358 # Main 359 print('#'*56) 360 print('#{:^54}#'.format('')) 361 print('#{:^54}#'.format('Scroll IRC Art Bot')) 362 print('#{:^54}#'.format('Developed by acidvegas in Python')) 363 print('#{:^54}#'.format('https://git.acid.vegas/scroll')) 364 print('#{:^54}#'.format('')) 365 print('#'*56) 366 try: 367 import chardet 368 except ImportError: 369 raise SystemExit('missing required \'chardet\' library (https://pypi.org/project/chardet/)') 370 try: 371 import img2irc 372 except ImportError: 373 raise SystemExit('missing required \'img2irc\' file (https://github.com/ircart/scroll/blob/master/img2irc.py)') 374 asyncio.run(Bot().connect())