apv

- 🐍 advanced python logging 📔
git clone git://git.acid.vegas/apv.git
Log | Files | Refs | Archive | README

apv.py (16828B)

      1 #!/usr/bin/env python3
      2 # Advanced Python Logging - Developed by acidvegas in Python (https://git.acid.vegas/apv)
      3 # apv.py
      4 
      5 import gzip
      6 import json
      7 import logging
      8 import logging.handlers
      9 import os
     10 import socket
     11 
     12 
     13 class LogColors:
     14     '''ANSI color codes for log messages.'''
     15 
     16     RESET     = '\033[0m'
     17     DATE      = '\033[90m'         # Dark Grey
     18     DEBUG     = '\033[96m'         # Cyan
     19     INFO      = '\033[92m'         # Green
     20     WARNING   = '\033[93m'         # Yellow
     21     ERROR     = '\033[91m'         # Red
     22     CRITICAL  = '\033[97m\033[41m' # White on Red
     23     FATAL     = '\033[97m\033[41m' # Same as CRITICAL
     24     NOTSET    = '\033[97m'         # White text
     25     SEPARATOR = '\033[90m'         # Dark Grey
     26     MODULE    = '\033[95m'         # Pink
     27     FUNCTION  = '\033[94m'         # Blue
     28     LINE      = '\033[33m'         # Orange
     29 
     30 
     31 class GZipRotatingFileHandler(logging.handlers.RotatingFileHandler):
     32     '''RotatingFileHandler that compresses old log files using gzip.'''
     33 
     34     def doRollover(self):
     35         '''Compress old log files using gzip.'''
     36 
     37         super().doRollover()
     38         if self.backupCount > 0:
     39             for i in range(self.backupCount, 0, -1):
     40                 sfn = f'{self.baseFilename}.{i}'
     41                 if os.path.exists(sfn):
     42                     with open(sfn, 'rb') as f_in:
     43                         with gzip.open(f'{sfn}.gz', 'wb') as f_out:
     44                             f_out.writelines(f_in)
     45                     os.remove(sfn)
     46 
     47 
     48 class LoggerSetup:
     49     def __init__(self, level='INFO', date_format='%Y-%m-%d %H:%M:%S',
     50                  log_to_disk=False, max_log_size=10*1024*1024,
     51                  max_backups=7, log_file_name='app', json_log=False,
     52                  ecs_log=False, show_details=False, compress_backups=False,
     53                  enable_graylog=False, graylog_host=None, graylog_port=None,
     54                  enable_cloudwatch=False, cloudwatch_group_name=None, cloudwatch_stream_name=None):
     55         '''
     56         Initialize the LoggerSetup with provided parameters.
     57         
     58         :param level: The logging level (e.g., 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL').
     59         :param date_format: The date format for log messages.
     60         :param log_to_disk: Whether to log to disk.
     61         :param max_log_size: The maximum size of log files before rotation.
     62         :param max_backups: The maximum number of backup log files to keep.
     63         :param log_file_name: The base name of the log file.
     64         :param json_log: Whether to log in JSON format.
     65         :param show_details: Whether to show detailed log messages.
     66         :param compress_backups: Whether to compress old log files using gzip.
     67         :param enable_graylog: Whether to enable Graylog logging.
     68         :param graylog_host: The Graylog host.
     69         :param graylog_port: The Graylog port.
     70         :param enable_cloudwatch: Whether to enable CloudWatch logging.
     71         :param cloudwatch_group_name: The CloudWatch log group name.
     72         :param cloudwatch_stream_name: The CloudWatch log stream name.
     73         '''
     74 
     75         self.level                  = level
     76         self.date_format            = date_format
     77         self.log_to_disk            = log_to_disk
     78         self.max_log_size           = max_log_size
     79         self.max_backups            = max_backups
     80         self.log_file_name          = log_file_name
     81         self.json_log               = json_log
     82         self.ecs_log                = ecs_log
     83         self.show_details           = show_details
     84         self.compress_backups       = compress_backups
     85         self.enable_graylog         = enable_graylog
     86         self.graylog_host           = graylog_host
     87         self.graylog_port           = graylog_port
     88         self.enable_cloudwatch      = enable_cloudwatch
     89         self.cloudwatch_group_name  = cloudwatch_group_name
     90         self.cloudwatch_stream_name = cloudwatch_stream_name
     91 
     92 
     93     def setup(self):
     94         '''Set up logging with various handlers and options.'''
     95 
     96         # Clear existing handlers
     97         logging.getLogger().handlers.clear()
     98         logging.getLogger().setLevel(logging.DEBUG)  # Capture all logs at the root level
     99 
    100         # Convert the level string to a logging level object
    101         level_num = getattr(logging, self.level.upper(), logging.INFO)
    102 
    103         self.setup_console_handler(level_num)
    104 
    105         if self.log_to_disk:
    106             self.setup_file_handler(level_num)
    107 
    108         if self.enable_graylog:
    109             self.setup_graylog_handler(level_num)
    110 
    111         if self.enable_cloudwatch:
    112             self.setup_cloudwatch_handler(level_num)
    113 
    114 
    115     def setup_console_handler(self, level_num: int):
    116         '''
    117         Set up the console handler with colored output.
    118         
    119         :param level_num: The logging level number.
    120         '''
    121 
    122         # Define the colored formatter
    123         class ColoredFormatter(logging.Formatter):
    124             def __init__(self, datefmt=None, show_details=False):
    125                 super().__init__(datefmt=datefmt)
    126                 self.show_details = show_details
    127                 self.LEVEL_COLORS = {
    128                     'NOTSET'   : LogColors.NOTSET,
    129                     'DEBUG'    : LogColors.DEBUG,
    130                     'INFO'     : LogColors.INFO,
    131                     'WARNING'  : LogColors.WARNING,
    132                     'ERROR'    : LogColors.ERROR,
    133                     'CRITICAL' : LogColors.CRITICAL,
    134                     'FATAL'    : LogColors.FATAL
    135                 }
    136 
    137             def format(self, record):
    138                 log_level = record.levelname
    139                 message   = record.getMessage()
    140                 asctime   = self.formatTime(record, self.datefmt)
    141                 color     = self.LEVEL_COLORS.get(log_level, LogColors.RESET)
    142                 separator = f'{LogColors.SEPARATOR} ┃ {LogColors.RESET}'
    143                 if self.show_details:
    144                     module    = record.module
    145                     line_no   = record.lineno
    146                     func_name = record.funcName
    147                     formatted = (
    148                         f'{LogColors.DATE}{asctime}{LogColors.RESET}'
    149                         f'{separator}'
    150                         f'{color}{log_level:<8}{LogColors.RESET}'
    151                         f'{separator}'
    152                         f'{LogColors.MODULE}{module}{LogColors.RESET}'
    153                         f'{separator}'
    154                         f'{LogColors.FUNCTION}{func_name}{LogColors.RESET}'
    155                         f'{separator}'
    156                         f'{LogColors.LINE}{line_no}{LogColors.RESET}'
    157                         f'{separator}'
    158                         f'{message}'
    159                     )
    160                 else:
    161                     formatted = (
    162                         f'{LogColors.DATE}{asctime}{LogColors.RESET}'
    163                         f'{separator}'
    164                         f'{color}{log_level:<8}{LogColors.RESET}'
    165                         f'{separator}'
    166                         f'{message}'
    167                     )
    168                 return formatted
    169 
    170         # Create console handler with colored output
    171         console_handler = logging.StreamHandler()
    172         console_handler.setLevel(level_num)
    173         console_formatter = ColoredFormatter(datefmt=self.date_format, show_details=self.show_details)
    174         console_handler.setFormatter(console_formatter)
    175         logging.getLogger().addHandler(console_handler)
    176 
    177 
    178     def setup_file_handler(self, level_num: int):
    179         '''
    180         Set up the file handler for logging to disk.
    181         
    182         :param level_num: The logging level number.
    183         '''
    184 
    185         # Create 'logs' directory if it doesn't exist
    186         logs_dir = os.path.join(os.getcwd(), 'logs')
    187         os.makedirs(logs_dir, exist_ok=True)
    188 
    189         # Use the specified log file name and set extension based on json_log
    190         file_extension = '.json' if self.json_log else '.log'
    191         log_file_path = os.path.join(logs_dir, f'{self.log_file_name}{file_extension}')
    192 
    193         # Create the rotating file handler
    194         if self.compress_backups:
    195             file_handler = GZipRotatingFileHandler(log_file_path, maxBytes=self.max_log_size, backupCount=self.max_backups)
    196         else:
    197             file_handler = logging.handlers.RotatingFileHandler(log_file_path, maxBytes=self.max_log_size, backupCount=self.max_backups)
    198         file_handler.setLevel(level_num)
    199 
    200         if self.ecs_log:
    201             try:
    202                 import ecs_logging
    203             except ImportError:
    204                 raise ImportError("The 'ecs-logging' library is required for ECS logging. Install it with 'pip install ecs-logging'.")
    205             file_formatter = ecs_logging.StdlibFormatter()
    206         elif self.json_log:
    207             # Create the JSON formatter
    208             class JsonFormatter(logging.Formatter):
    209                 def format(self, record):
    210                     log_record = {
    211                         'time'        : self.formatTime(record, self.datefmt),
    212                         'level'       : record.levelname,
    213                         'module'      : record.module,
    214                         'function'    : record.funcName,
    215                         'line'        : record.lineno,
    216                         'message'     : record.getMessage(),
    217                         'name'        : record.name,
    218                         'filename'    : record.filename,
    219                         'threadName'  : record.threadName,
    220                         'processName' : record.processName,
    221                     }
    222                     return json.dumps(log_record)
    223             file_formatter = JsonFormatter(datefmt=self.date_format)
    224         else:
    225             file_formatter = logging.Formatter(fmt='%(asctime)s ┃ %(levelname)-8s ┃ %(module)s ┃ %(funcName)s ┃ %(lineno)d ┃ %(message)s', datefmt=self.date_format)
    226 
    227         file_handler.setFormatter(file_formatter)
    228         logging.getLogger().addHandler(file_handler)
    229 
    230 
    231     def setup_graylog_handler(self, level_num: int):
    232         '''
    233         Set up the Graylog handler.
    234         
    235         :param level_num: The logging level number.
    236         '''
    237 
    238         graylog_host = self.graylog_host
    239         graylog_port = self.graylog_port
    240         if graylog_host is None or graylog_port is None:
    241             logging.error('Graylog host and port must be specified for Graylog handler.')
    242             return
    243 
    244         class GraylogHandler(logging.Handler):
    245             def __init__(self, graylog_host, graylog_port):
    246                 super().__init__()
    247                 self.graylog_host = graylog_host
    248                 self.graylog_port = graylog_port
    249                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    250 
    251                 # Mapping from Python logging levels to Graylog (syslog) levels
    252                 self.level_mapping = {
    253                     logging.CRITICAL : 2, # Critical
    254                     logging.ERROR    : 3, # Error
    255                     logging.WARNING  : 4, # Warning
    256                     logging.INFO     : 6, # Informational
    257                     logging.DEBUG    : 7, # Debug
    258                     logging.NOTSET   : 7  # Default to Debug
    259                 }
    260 
    261             def emit(self, record):
    262                 try:
    263                     log_entry = self.format(record)
    264                     graylog_level = self.level_mapping.get(record.levelno, 7)  # Default to Debug
    265                     gelf_message = {
    266                         'version'       : '1.1',
    267                         'host'          : socket.gethostname(),
    268                         'short_message' : record.getMessage(),
    269                         'full_message'  : log_entry,
    270                         'timestamp'     : record.created,
    271                         'level'         : graylog_level,
    272                         '_logger_name'  : record.name,
    273                         '_file'         : record.pathname,
    274                         '_line'         : record.lineno,
    275                         '_function'     : record.funcName,
    276                         '_module'       : record.module,
    277                     }
    278                     gelf_json = json.dumps(gelf_message).encode('utf-8')
    279                     self.sock.sendto(gelf_json, (self.graylog_host, self.graylog_port))
    280                 except Exception:
    281                     self.handleError(record)
    282 
    283         graylog_handler = GraylogHandler(graylog_host, graylog_port)
    284         graylog_handler.setLevel(level_num)
    285 
    286         graylog_formatter = logging.Formatter(fmt='%(message)s')
    287         graylog_handler.setFormatter(graylog_formatter)
    288         logging.getLogger().addHandler(graylog_handler)
    289 
    290 
    291     def setup_cloudwatch_handler(self, level_num: int):
    292         '''
    293         Set up the CloudWatch handler.
    294         
    295         :param level_num: The logging level number.
    296         '''
    297 
    298         try:
    299             import boto3
    300             from botocore.exceptions import ClientError
    301         except ImportError:
    302             raise ImportError('boto3 is required for CloudWatch logging. (pip install boto3)')
    303 
    304         log_group_name = self.cloudwatch_group_name
    305         log_stream_name = self.cloudwatch_stream_name
    306         if not log_group_name or not log_stream_name:
    307             logging.error('CloudWatch log group and log stream must be specified for CloudWatch handler.')
    308             return
    309 
    310         class CloudWatchHandler(logging.Handler):
    311             def __init__(self, log_group_name, log_stream_name):
    312                 super().__init__()
    313                 self.log_group_name = log_group_name
    314                 self.log_stream_name = log_stream_name
    315                 self.client = boto3.client('logs')
    316 
    317                 # Create log group if it doesn't exist
    318                 try:
    319                     self.client.create_log_group(logGroupName=self.log_group_name)
    320                 except ClientError as e:
    321                     if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
    322                         raise e
    323 
    324                 # Create log stream if it doesn't exist
    325                 try:
    326                     self.client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name)
    327                 except ClientError as e:
    328                     if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
    329                         raise e
    330 
    331             def _get_sequence_token(self):
    332                 try:
    333                     response = self.client.describe_log_streams(
    334                         logGroupName=self.log_group_name,
    335                         logStreamNamePrefix=self.log_stream_name,
    336                         limit=1
    337                     )
    338                     log_streams = response.get('logStreams', [])
    339                     if log_streams:
    340                         return log_streams[0].get('uploadSequenceToken')
    341                     else:
    342                         return None
    343                 except Exception:
    344                     return None
    345 
    346             def emit(self, record):
    347                 try:
    348                     log_entry = self.format(record)
    349                     timestamp = int(record.created * 1000)
    350                     event = {
    351                         'timestamp': timestamp,
    352                         'message': log_entry
    353                     }
    354                     sequence_token = self._get_sequence_token()
    355                     kwargs = {
    356                         'logGroupName': self.log_group_name,
    357                         'logStreamName': self.log_stream_name,
    358                         'logEvents': [event]
    359                     }
    360                     if sequence_token:
    361                         kwargs['sequenceToken'] = sequence_token
    362                     self.client.put_log_events(**kwargs)
    363                 except Exception:
    364                     self.handleError(record)
    365 
    366         cloudwatch_handler = CloudWatchHandler(log_group_name, log_stream_name)
    367         cloudwatch_handler.setLevel(level_num)
    368         
    369         # Log as JSON
    370         class JsonFormatter(logging.Formatter):
    371             def format(self, record):
    372                 log_record = {
    373                     'time'        : self.formatTime(record, self.datefmt),
    374                     'level'       : record.levelname,
    375                     'module'      : record.module,
    376                     'function'    : record.funcName,
    377                     'line'        : record.lineno,
    378                     'message'     : record.getMessage(),
    379                     'name'        : record.name,
    380                     'filename'    : record.filename,
    381                     'threadName'  : record.threadName,
    382                     'processName' : record.processName,
    383                 }
    384                 return json.dumps(log_record)
    385             
    386         cloudwatch_formatter = JsonFormatter(datefmt=self.date_format)
    387         cloudwatch_handler.setFormatter(cloudwatch_formatter)
    388         logging.getLogger().addHandler(cloudwatch_handler)
    389 
    390 
    391 
    392 def setup_logging(**kwargs):
    393     '''Set up logging with various handlers and options.'''
    394 
    395     logger_setup = LoggerSetup(**kwargs)
    396     logger_setup.setup()