apv- 🐍 advanced python logging 📔 |
git clone git://git.acid.vegas/apv.git |
Log | Files | Refs | Archive | README | LICENSE |
cloudwatch.py (3628B)
1 import logging 2 import json 3 import boto3 4 from botocore.exceptions import ClientError 5 6 class CloudWatchHandler(logging.Handler): 7 def __init__(self, group_name, stream_name): 8 super().__init__() 9 self.group_name = group_name 10 self.stream_name = stream_name 11 self.client = boto3.client('logs') 12 self._initialize_log_group_and_stream() 13 14 def _initialize_log_group_and_stream(self): 15 # Create log group if it doesn't exist 16 try: 17 self.client.create_log_group(logGroupName=self.group_name) 18 except ClientError as e: 19 if e.response['Error']['Code'] != 'ResourceAlreadyExistsException': 20 raise e 21 22 # Create log stream if it doesn't exist 23 try: 24 self.client.create_log_stream( 25 logGroupName=self.group_name, 26 logStreamName=self.stream_name 27 ) 28 except ClientError as e: 29 if e.response['Error']['Code'] != 'ResourceAlreadyExistsException': 30 raise e 31 32 def _get_sequence_token(self): 33 try: 34 response = self.client.describe_log_streams( 35 logGroupName=self.group_name, 36 logStreamNamePrefix=self.stream_name, 37 limit=1 38 ) 39 log_streams = response.get('logStreams', []) 40 return log_streams[0].get('uploadSequenceToken') if log_streams else None 41 except Exception: 42 return None 43 44 def emit(self, record): 45 try: 46 log_entry = self.format(record) 47 timestamp = int(record.created * 1000) 48 49 event = { 50 'timestamp': timestamp, 51 'message': log_entry 52 } 53 54 kwargs = { 55 'logGroupName': self.group_name, 56 'logStreamName': self.stream_name, 57 'logEvents': [event] 58 } 59 60 sequence_token = self._get_sequence_token() 61 if sequence_token: 62 kwargs['sequenceToken'] = sequence_token 63 64 self.client.put_log_events(**kwargs) 65 except Exception: 66 self.handleError(record) 67 68 def setup_cloudwatch_handler(level_num: int, group_name: str, stream_name: str, date_format: str): 69 '''Set up the CloudWatch handler.''' 70 try: 71 import boto3 72 except ImportError: 73 raise ImportError('boto3 is required for CloudWatch logging. (pip install boto3)') 74 75 if not group_name or not stream_name: 76 logging.error('CloudWatch log group and log stream must be specified for CloudWatch handler.') 77 return 78 79 cloudwatch_handler = CloudWatchHandler(group_name, stream_name) 80 cloudwatch_handler.setLevel(level_num) 81 82 class JsonFormatter(logging.Formatter): 83 def format(self, record): 84 log_record = { 85 'time' : self.formatTime(record, date_format), 86 'level' : record.levelname, 87 'module' : record.module, 88 'function' : record.funcName, 89 'line' : record.lineno, 90 'message' : record.getMessage(), 91 'name' : record.name, 92 'filename' : record.filename, 93 'threadName' : record.threadName, 94 'processName' : record.processName, 95 } 96 return json.dumps(log_record) 97 98 cloudwatch_formatter = JsonFormatter(datefmt=date_format) 99 cloudwatch_handler.setFormatter(cloudwatch_formatter) 100 logging.getLogger().addHandler(cloudwatch_handler)