apv- 🐍 advanced python logging 📔 |
git clone git://git.acid.vegas/apv.git |
Log | Files | Refs | Archive | README |
apv.py (16828B)
1 #!/usr/bin/env python3 2 # Advanced Python Logging - Developed by acidvegas in Python (https://git.acid.vegas/apv) 3 # apv.py 4 5 import gzip 6 import json 7 import logging 8 import logging.handlers 9 import os 10 import socket 11 12 13 class LogColors: 14 '''ANSI color codes for log messages.''' 15 16 RESET = '\033[0m' 17 DATE = '\033[90m' # Dark Grey 18 DEBUG = '\033[96m' # Cyan 19 INFO = '\033[92m' # Green 20 WARNING = '\033[93m' # Yellow 21 ERROR = '\033[91m' # Red 22 CRITICAL = '\033[97m\033[41m' # White on Red 23 FATAL = '\033[97m\033[41m' # Same as CRITICAL 24 NOTSET = '\033[97m' # White text 25 SEPARATOR = '\033[90m' # Dark Grey 26 MODULE = '\033[95m' # Pink 27 FUNCTION = '\033[94m' # Blue 28 LINE = '\033[33m' # Orange 29 30 31 class GZipRotatingFileHandler(logging.handlers.RotatingFileHandler): 32 '''RotatingFileHandler that compresses old log files using gzip.''' 33 34 def doRollover(self): 35 '''Compress old log files using gzip.''' 36 37 super().doRollover() 38 if self.backupCount > 0: 39 for i in range(self.backupCount, 0, -1): 40 sfn = f'{self.baseFilename}.{i}' 41 if os.path.exists(sfn): 42 with open(sfn, 'rb') as f_in: 43 with gzip.open(f'{sfn}.gz', 'wb') as f_out: 44 f_out.writelines(f_in) 45 os.remove(sfn) 46 47 48 class LoggerSetup: 49 def __init__(self, level='INFO', date_format='%Y-%m-%d %H:%M:%S', 50 log_to_disk=False, max_log_size=10*1024*1024, 51 max_backups=7, log_file_name='app', json_log=False, 52 ecs_log=False, show_details=False, compress_backups=False, 53 enable_graylog=False, graylog_host=None, graylog_port=None, 54 enable_cloudwatch=False, cloudwatch_group_name=None, cloudwatch_stream_name=None): 55 ''' 56 Initialize the LoggerSetup with provided parameters. 57 58 :param level: The logging level (e.g., 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'). 59 :param date_format: The date format for log messages. 60 :param log_to_disk: Whether to log to disk. 61 :param max_log_size: The maximum size of log files before rotation. 62 :param max_backups: The maximum number of backup log files to keep. 63 :param log_file_name: The base name of the log file. 64 :param json_log: Whether to log in JSON format. 65 :param show_details: Whether to show detailed log messages. 66 :param compress_backups: Whether to compress old log files using gzip. 67 :param enable_graylog: Whether to enable Graylog logging. 68 :param graylog_host: The Graylog host. 69 :param graylog_port: The Graylog port. 70 :param enable_cloudwatch: Whether to enable CloudWatch logging. 71 :param cloudwatch_group_name: The CloudWatch log group name. 72 :param cloudwatch_stream_name: The CloudWatch log stream name. 73 ''' 74 75 self.level = level 76 self.date_format = date_format 77 self.log_to_disk = log_to_disk 78 self.max_log_size = max_log_size 79 self.max_backups = max_backups 80 self.log_file_name = log_file_name 81 self.json_log = json_log 82 self.ecs_log = ecs_log 83 self.show_details = show_details 84 self.compress_backups = compress_backups 85 self.enable_graylog = enable_graylog 86 self.graylog_host = graylog_host 87 self.graylog_port = graylog_port 88 self.enable_cloudwatch = enable_cloudwatch 89 self.cloudwatch_group_name = cloudwatch_group_name 90 self.cloudwatch_stream_name = cloudwatch_stream_name 91 92 93 def setup(self): 94 '''Set up logging with various handlers and options.''' 95 96 # Clear existing handlers 97 logging.getLogger().handlers.clear() 98 logging.getLogger().setLevel(logging.DEBUG) # Capture all logs at the root level 99 100 # Convert the level string to a logging level object 101 level_num = getattr(logging, self.level.upper(), logging.INFO) 102 103 self.setup_console_handler(level_num) 104 105 if self.log_to_disk: 106 self.setup_file_handler(level_num) 107 108 if self.enable_graylog: 109 self.setup_graylog_handler(level_num) 110 111 if self.enable_cloudwatch: 112 self.setup_cloudwatch_handler(level_num) 113 114 115 def setup_console_handler(self, level_num: int): 116 ''' 117 Set up the console handler with colored output. 118 119 :param level_num: The logging level number. 120 ''' 121 122 # Define the colored formatter 123 class ColoredFormatter(logging.Formatter): 124 def __init__(self, datefmt=None, show_details=False): 125 super().__init__(datefmt=datefmt) 126 self.show_details = show_details 127 self.LEVEL_COLORS = { 128 'NOTSET' : LogColors.NOTSET, 129 'DEBUG' : LogColors.DEBUG, 130 'INFO' : LogColors.INFO, 131 'WARNING' : LogColors.WARNING, 132 'ERROR' : LogColors.ERROR, 133 'CRITICAL' : LogColors.CRITICAL, 134 'FATAL' : LogColors.FATAL 135 } 136 137 def format(self, record): 138 log_level = record.levelname 139 message = record.getMessage() 140 asctime = self.formatTime(record, self.datefmt) 141 color = self.LEVEL_COLORS.get(log_level, LogColors.RESET) 142 separator = f'{LogColors.SEPARATOR} ┃ {LogColors.RESET}' 143 if self.show_details: 144 module = record.module 145 line_no = record.lineno 146 func_name = record.funcName 147 formatted = ( 148 f'{LogColors.DATE}{asctime}{LogColors.RESET}' 149 f'{separator}' 150 f'{color}{log_level:<8}{LogColors.RESET}' 151 f'{separator}' 152 f'{LogColors.MODULE}{module}{LogColors.RESET}' 153 f'{separator}' 154 f'{LogColors.FUNCTION}{func_name}{LogColors.RESET}' 155 f'{separator}' 156 f'{LogColors.LINE}{line_no}{LogColors.RESET}' 157 f'{separator}' 158 f'{message}' 159 ) 160 else: 161 formatted = ( 162 f'{LogColors.DATE}{asctime}{LogColors.RESET}' 163 f'{separator}' 164 f'{color}{log_level:<8}{LogColors.RESET}' 165 f'{separator}' 166 f'{message}' 167 ) 168 return formatted 169 170 # Create console handler with colored output 171 console_handler = logging.StreamHandler() 172 console_handler.setLevel(level_num) 173 console_formatter = ColoredFormatter(datefmt=self.date_format, show_details=self.show_details) 174 console_handler.setFormatter(console_formatter) 175 logging.getLogger().addHandler(console_handler) 176 177 178 def setup_file_handler(self, level_num: int): 179 ''' 180 Set up the file handler for logging to disk. 181 182 :param level_num: The logging level number. 183 ''' 184 185 # Create 'logs' directory if it doesn't exist 186 logs_dir = os.path.join(os.getcwd(), 'logs') 187 os.makedirs(logs_dir, exist_ok=True) 188 189 # Use the specified log file name and set extension based on json_log 190 file_extension = '.json' if self.json_log else '.log' 191 log_file_path = os.path.join(logs_dir, f'{self.log_file_name}{file_extension}') 192 193 # Create the rotating file handler 194 if self.compress_backups: 195 file_handler = GZipRotatingFileHandler(log_file_path, maxBytes=self.max_log_size, backupCount=self.max_backups) 196 else: 197 file_handler = logging.handlers.RotatingFileHandler(log_file_path, maxBytes=self.max_log_size, backupCount=self.max_backups) 198 file_handler.setLevel(level_num) 199 200 if self.ecs_log: 201 try: 202 import ecs_logging 203 except ImportError: 204 raise ImportError("The 'ecs-logging' library is required for ECS logging. Install it with 'pip install ecs-logging'.") 205 file_formatter = ecs_logging.StdlibFormatter() 206 elif self.json_log: 207 # Create the JSON formatter 208 class JsonFormatter(logging.Formatter): 209 def format(self, record): 210 log_record = { 211 'time' : self.formatTime(record, self.datefmt), 212 'level' : record.levelname, 213 'module' : record.module, 214 'function' : record.funcName, 215 'line' : record.lineno, 216 'message' : record.getMessage(), 217 'name' : record.name, 218 'filename' : record.filename, 219 'threadName' : record.threadName, 220 'processName' : record.processName, 221 } 222 return json.dumps(log_record) 223 file_formatter = JsonFormatter(datefmt=self.date_format) 224 else: 225 file_formatter = logging.Formatter(fmt='%(asctime)s ┃ %(levelname)-8s ┃ %(module)s ┃ %(funcName)s ┃ %(lineno)d ┃ %(message)s', datefmt=self.date_format) 226 227 file_handler.setFormatter(file_formatter) 228 logging.getLogger().addHandler(file_handler) 229 230 231 def setup_graylog_handler(self, level_num: int): 232 ''' 233 Set up the Graylog handler. 234 235 :param level_num: The logging level number. 236 ''' 237 238 graylog_host = self.graylog_host 239 graylog_port = self.graylog_port 240 if graylog_host is None or graylog_port is None: 241 logging.error('Graylog host and port must be specified for Graylog handler.') 242 return 243 244 class GraylogHandler(logging.Handler): 245 def __init__(self, graylog_host, graylog_port): 246 super().__init__() 247 self.graylog_host = graylog_host 248 self.graylog_port = graylog_port 249 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 250 251 # Mapping from Python logging levels to Graylog (syslog) levels 252 self.level_mapping = { 253 logging.CRITICAL : 2, # Critical 254 logging.ERROR : 3, # Error 255 logging.WARNING : 4, # Warning 256 logging.INFO : 6, # Informational 257 logging.DEBUG : 7, # Debug 258 logging.NOTSET : 7 # Default to Debug 259 } 260 261 def emit(self, record): 262 try: 263 log_entry = self.format(record) 264 graylog_level = self.level_mapping.get(record.levelno, 7) # Default to Debug 265 gelf_message = { 266 'version' : '1.1', 267 'host' : socket.gethostname(), 268 'short_message' : record.getMessage(), 269 'full_message' : log_entry, 270 'timestamp' : record.created, 271 'level' : graylog_level, 272 '_logger_name' : record.name, 273 '_file' : record.pathname, 274 '_line' : record.lineno, 275 '_function' : record.funcName, 276 '_module' : record.module, 277 } 278 gelf_json = json.dumps(gelf_message).encode('utf-8') 279 self.sock.sendto(gelf_json, (self.graylog_host, self.graylog_port)) 280 except Exception: 281 self.handleError(record) 282 283 graylog_handler = GraylogHandler(graylog_host, graylog_port) 284 graylog_handler.setLevel(level_num) 285 286 graylog_formatter = logging.Formatter(fmt='%(message)s') 287 graylog_handler.setFormatter(graylog_formatter) 288 logging.getLogger().addHandler(graylog_handler) 289 290 291 def setup_cloudwatch_handler(self, level_num: int): 292 ''' 293 Set up the CloudWatch handler. 294 295 :param level_num: The logging level number. 296 ''' 297 298 try: 299 import boto3 300 from botocore.exceptions import ClientError 301 except ImportError: 302 raise ImportError('boto3 is required for CloudWatch logging. (pip install boto3)') 303 304 log_group_name = self.cloudwatch_group_name 305 log_stream_name = self.cloudwatch_stream_name 306 if not log_group_name or not log_stream_name: 307 logging.error('CloudWatch log group and log stream must be specified for CloudWatch handler.') 308 return 309 310 class CloudWatchHandler(logging.Handler): 311 def __init__(self, log_group_name, log_stream_name): 312 super().__init__() 313 self.log_group_name = log_group_name 314 self.log_stream_name = log_stream_name 315 self.client = boto3.client('logs') 316 317 # Create log group if it doesn't exist 318 try: 319 self.client.create_log_group(logGroupName=self.log_group_name) 320 except ClientError as e: 321 if e.response['Error']['Code'] != 'ResourceAlreadyExistsException': 322 raise e 323 324 # Create log stream if it doesn't exist 325 try: 326 self.client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name) 327 except ClientError as e: 328 if e.response['Error']['Code'] != 'ResourceAlreadyExistsException': 329 raise e 330 331 def _get_sequence_token(self): 332 try: 333 response = self.client.describe_log_streams( 334 logGroupName=self.log_group_name, 335 logStreamNamePrefix=self.log_stream_name, 336 limit=1 337 ) 338 log_streams = response.get('logStreams', []) 339 if log_streams: 340 return log_streams[0].get('uploadSequenceToken') 341 else: 342 return None 343 except Exception: 344 return None 345 346 def emit(self, record): 347 try: 348 log_entry = self.format(record) 349 timestamp = int(record.created * 1000) 350 event = { 351 'timestamp': timestamp, 352 'message': log_entry 353 } 354 sequence_token = self._get_sequence_token() 355 kwargs = { 356 'logGroupName': self.log_group_name, 357 'logStreamName': self.log_stream_name, 358 'logEvents': [event] 359 } 360 if sequence_token: 361 kwargs['sequenceToken'] = sequence_token 362 self.client.put_log_events(**kwargs) 363 except Exception: 364 self.handleError(record) 365 366 cloudwatch_handler = CloudWatchHandler(log_group_name, log_stream_name) 367 cloudwatch_handler.setLevel(level_num) 368 369 # Log as JSON 370 class JsonFormatter(logging.Formatter): 371 def format(self, record): 372 log_record = { 373 'time' : self.formatTime(record, self.datefmt), 374 'level' : record.levelname, 375 'module' : record.module, 376 'function' : record.funcName, 377 'line' : record.lineno, 378 'message' : record.getMessage(), 379 'name' : record.name, 380 'filename' : record.filename, 381 'threadName' : record.threadName, 382 'processName' : record.processName, 383 } 384 return json.dumps(log_record) 385 386 cloudwatch_formatter = JsonFormatter(datefmt=self.date_format) 387 cloudwatch_handler.setFormatter(cloudwatch_formatter) 388 logging.getLogger().addHandler(cloudwatch_handler) 389 390 391 392 def setup_logging(**kwargs): 393 '''Set up logging with various handlers and options.''' 394 395 logger_setup = LoggerSetup(**kwargs) 396 logger_setup.setup()