apv

- 🐍 advanced python logging 📔
git clone git://git.acid.vegas/apv.git
Log | Files | Refs | Archive | README | LICENSE

cloudwatch.py (3628B)

      1 import logging
      2 import json
      3 import boto3
      4 from botocore.exceptions import ClientError
      5 
      6 class CloudWatchHandler(logging.Handler):
      7     def __init__(self, group_name, stream_name):
      8         super().__init__()
      9         self.group_name = group_name
     10         self.stream_name = stream_name
     11         self.client = boto3.client('logs')
     12         self._initialize_log_group_and_stream()
     13 
     14     def _initialize_log_group_and_stream(self):
     15         # Create log group if it doesn't exist
     16         try:
     17             self.client.create_log_group(logGroupName=self.group_name)
     18         except ClientError as e:
     19             if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
     20                 raise e
     21 
     22         # Create log stream if it doesn't exist
     23         try:
     24             self.client.create_log_stream(
     25                 logGroupName=self.group_name,
     26                 logStreamName=self.stream_name
     27             )
     28         except ClientError as e:
     29             if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
     30                 raise e
     31 
     32     def _get_sequence_token(self):
     33         try:
     34             response = self.client.describe_log_streams(
     35                 logGroupName=self.group_name,
     36                 logStreamNamePrefix=self.stream_name,
     37                 limit=1
     38             )
     39             log_streams = response.get('logStreams', [])
     40             return log_streams[0].get('uploadSequenceToken') if log_streams else None
     41         except Exception:
     42             return None
     43 
     44     def emit(self, record):
     45         try:
     46             log_entry = self.format(record)
     47             timestamp = int(record.created * 1000)
     48             
     49             event = {
     50                 'timestamp': timestamp,
     51                 'message': log_entry
     52             }
     53             
     54             kwargs = {
     55                 'logGroupName': self.group_name,
     56                 'logStreamName': self.stream_name,
     57                 'logEvents': [event]
     58             }
     59             
     60             sequence_token = self._get_sequence_token()
     61             if sequence_token:
     62                 kwargs['sequenceToken'] = sequence_token
     63                 
     64             self.client.put_log_events(**kwargs)
     65         except Exception:
     66             self.handleError(record) 
     67 
     68 def setup_cloudwatch_handler(level_num: int, group_name: str, stream_name: str, date_format: str):
     69     '''Set up the CloudWatch handler.'''
     70     try:
     71         import boto3
     72     except ImportError:
     73         raise ImportError('boto3 is required for CloudWatch logging. (pip install boto3)')
     74 
     75     if not group_name or not stream_name:
     76         logging.error('CloudWatch log group and log stream must be specified for CloudWatch handler.')
     77         return
     78 
     79     cloudwatch_handler = CloudWatchHandler(group_name, stream_name)
     80     cloudwatch_handler.setLevel(level_num)
     81     
     82     class JsonFormatter(logging.Formatter):
     83         def format(self, record):
     84             log_record = {
     85                 'time'        : self.formatTime(record, date_format),
     86                 'level'       : record.levelname,
     87                 'module'      : record.module,
     88                 'function'    : record.funcName,
     89                 'line'        : record.lineno,
     90                 'message'     : record.getMessage(),
     91                 'name'        : record.name,
     92                 'filename'    : record.filename,
     93                 'threadName'  : record.threadName,
     94                 'processName' : record.processName,
     95             }
     96             return json.dumps(log_record)
     97     
     98     cloudwatch_formatter = JsonFormatter(datefmt=date_format)
     99     cloudwatch_handler.setFormatter(cloudwatch_formatter)
    100     logging.getLogger().addHandler(cloudwatch_handler)