-
Notifications
You must be signed in to change notification settings - Fork 407
/
Copy pathformatter.py
483 lines (383 loc) · 18.2 KB
/
formatter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
from __future__ import annotations
import inspect
import json
import logging
import os
import time
import traceback
from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
from contextvars import ContextVar
from datetime import datetime, timezone
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, Generator, Iterable
from aws_lambda_powertools.shared import constants
from aws_lambda_powertools.shared.functions import powertools_dev_is_set
if TYPE_CHECKING:
from aws_lambda_powertools.logging.types import LogRecord, LogStackTrace
RESERVED_LOG_ATTRS = (
"name",
"msg",
"args",
"level",
"levelname",
"levelno",
"pathname",
"filename",
"module",
"exc_info",
"exc_text",
"stack_info",
"lineno",
"funcName",
"created",
"msecs",
"relativeCreated",
"thread",
"threadName",
"processName",
"process",
"asctime",
"location",
"timestamp",
)
class BasePowertoolsFormatter(logging.Formatter, metaclass=ABCMeta):
@abstractmethod
def append_keys(self, **additional_keys) -> None:
raise NotImplementedError()
def get_current_keys(self) -> dict[str, Any]:
return {}
def remove_keys(self, keys: Iterable[str]) -> None:
raise NotImplementedError()
@abstractmethod
def clear_state(self) -> None:
"""Removes any previously added logging keys"""
raise NotImplementedError()
@contextmanager
def append_context_keys(self, **additional_keys: Any) -> Generator[None, None, None]:
yield
# These specific thread-safe methods are necessary to manage shared context in concurrent environments.
# They prevent race conditions and ensure data consistency across multiple threads.
def thread_safe_append_keys(self, **additional_keys) -> None:
raise NotImplementedError()
def thread_safe_get_current_keys(self) -> dict[str, Any]:
return {}
def thread_safe_remove_keys(self, keys: Iterable[str]) -> None:
raise NotImplementedError()
def thread_safe_clear_keys(self) -> None:
"""Removes any previously added logging keys in a specific thread"""
raise NotImplementedError()
class LambdaPowertoolsFormatter(BasePowertoolsFormatter):
"""Powertools for AWS Lambda (Python) Logging formatter.
Formats the log message as a JSON encoded string. If the message is a
dict it will be used directly.
"""
default_time_format = "%Y-%m-%d %H:%M:%S,%F%z" # '2021-04-17 18:19:57,656+0200'
custom_ms_time_directive = "%F"
RFC3339_ISO8601_FORMAT = "%Y-%m-%dT%H:%M:%S.%F%z" # '2022-10-27T16:27:43.738+02:00'
def __init__(
self,
json_serializer: Callable[[LogRecord], str] | None = None,
json_deserializer: Callable[[dict | str | bool | int | float], str] | None = None,
json_default: Callable[[Any], Any] | None = None,
datefmt: str | None = None,
use_datetime_directive: bool = False,
log_record_order: list[str] | None = None,
utc: bool = False,
use_rfc3339: bool = False,
serialize_stacktrace: bool = True,
**kwargs,
) -> None:
"""Return a LambdaPowertoolsFormatter instance.
The `log_record_order` kwarg is used to specify the order of the keys used in
the structured json logs. By default the order is: "level", "location", "message", "timestamp",
"service".
Other kwargs are used to specify log field format strings.
Parameters
----------
json_serializer : Callable, optional
function to serialize `obj` to a JSON formatted `str`, by default json.dumps
json_deserializer : Callable, optional
function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
by default json.loads
json_default : Callable, optional
function to coerce unserializable values, by default str
Only used when no custom JSON encoder is set
datefmt : str, optional
String directives (strftime) to format log timestamp.
See https://docs.python.org/3/library/time.html#time.strftime or
use_datetime_directive: str, optional
Interpret `datefmt` as a format string for `datetime.datetime.strftime`, rather than
`time.strftime` - Only useful when used alongside `datefmt`.
See https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior . This
also supports a custom %F directive for milliseconds.
utc : bool, optional
set logging timestamp to UTC, by default False to continue to use local time as per stdlib
use_rfc3339: bool, optional
Whether to use a popular dateformat that complies with both RFC3339 and ISO8601.
e.g., 2022-10-27T16:27:43.738+02:00.
log_record_order : list, optional
set order of log keys when logging, by default ["level", "location", "message", "timestamp"]
kwargs
Key-value to be included in log messages
"""
self.json_deserializer = json_deserializer or json.loads
self.json_default = json_default or str
self.json_indent = (
constants.PRETTY_INDENT if powertools_dev_is_set() else constants.COMPACT_INDENT
) # indented json serialization when in AWS SAM Local
self.json_serializer = json_serializer or partial(
json.dumps,
default=self.json_default,
separators=(",", ":"),
indent=self.json_indent,
ensure_ascii=False, # see #3474
)
self.datefmt = datefmt
self.use_datetime_directive = use_datetime_directive
self.utc = utc
self.log_record_order = log_record_order or ["level", "location", "message", "timestamp"]
self.log_format = dict.fromkeys(self.log_record_order) # Set the insertion order for the log messages
self.update_formatter = self.append_keys # alias to old method
self.use_rfc3339_iso8601 = use_rfc3339
if self.utc:
self.converter = time.gmtime
else:
self.converter = time.localtime
self.keys_combined = {**self._build_default_keys(), **kwargs}
self.log_format.update(**self.keys_combined)
self.serialize_stacktrace = serialize_stacktrace
super().__init__(datefmt=self.datefmt)
def serialize(self, log: LogRecord) -> str:
"""Serialize structured log dict to JSON str"""
return self.json_serializer(log)
def format(self, record: logging.LogRecord) -> str: # noqa: A003
"""Format logging record as structured JSON str"""
formatted_log = self._extract_log_keys(log_record=record)
formatted_log["message"] = self._extract_log_message(log_record=record)
# exception and exception_name fields can be added as extra key
# in any log level, we try to extract and use them first
extracted_exception, extracted_exception_name = self._extract_log_exception(log_record=record)
formatted_log["exception"] = formatted_log.get("exception", extracted_exception)
formatted_log["exception_name"] = formatted_log.get("exception_name", extracted_exception_name)
if self.serialize_stacktrace:
# Generate the traceback from the traceback library
formatted_log["stack_trace"] = self._serialize_stacktrace(log_record=record)
formatted_log["xray_trace_id"] = self._get_latest_trace_id()
formatted_log = self._strip_none_records(records=formatted_log)
return self.serialize(log=formatted_log)
def formatTime(self, record: logging.LogRecord, datefmt: str | None = None) -> str:
# As of Py3.7, we can infer milliseconds directly from any datetime
# saving processing time as we can shortcircuit early
# Maintenance: In V3, we (and Java) should move to this format by default
# since we've provided enough time for those migrating from std logging
if self.use_rfc3339_iso8601:
if self.utc:
ts_as_datetime = datetime.fromtimestamp(record.created, tz=timezone.utc)
else:
ts_as_datetime = datetime.fromtimestamp(record.created).astimezone()
return ts_as_datetime.isoformat(timespec="milliseconds") # 2022-10-27T17:42:26.841+0200
# converts to local/UTC TZ as struct time
record_ts = self.converter(record.created)
if datefmt is None: # pragma: no cover, it'll always be None in std logging, but mypy
datefmt = self.datefmt
# NOTE: Python `time.strftime` doesn't provide msec directives
# so we create a custom one (%F) and replace logging record_ts
# Reason 2 is that std logging doesn't support msec after TZ
msecs = "%03d" % record.msecs # noqa UP031
# Datetime format codes is a superset of time format codes
# therefore we only honour them if explicitly asked
# by default, those migrating from std logging will use time format codes
# https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes
if self.use_datetime_directive and datefmt:
# record.msecs are microseconds, divide by 1000 to get milliseconds
timestamp = record.created + record.msecs / 1000
if self.utc:
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
else:
dt = datetime.fromtimestamp(timestamp).astimezone()
custom_fmt = datefmt.replace(self.custom_ms_time_directive, msecs)
return dt.strftime(custom_fmt)
# Only time format codes being used
elif datefmt:
custom_fmt = datefmt.replace(self.custom_ms_time_directive, msecs)
return time.strftime(custom_fmt, record_ts)
# Use default fmt: 2021-05-03 10:20:19,650+0200
custom_fmt = self.default_time_format.replace(self.custom_ms_time_directive, msecs)
return time.strftime(custom_fmt, record_ts)
def append_keys(self, **additional_keys) -> None:
self.log_format.update(additional_keys)
def get_current_keys(self) -> dict[str, Any]:
return self.log_format
def remove_keys(self, keys: Iterable[str]) -> None:
for key in keys:
self.log_format.pop(key, None)
def clear_state(self) -> None:
self.log_format = dict.fromkeys(self.log_record_order)
self.log_format.update(**self.keys_combined)
@contextmanager
def append_context_keys(self, **additional_keys: Any) -> Generator[None, None, None]:
"""
Context manager to temporarily add logging keys.
Parameters:
-----------
**keys: Any
Key-value pairs to include in the log context during the lifespan of the context manager.
Example:
--------
>>> logger = Logger(service="example_service")
>>> with logger.append_context_keys(user_id="123", operation="process"):
>>> logger.info("Log with context")
>>> logger.info("Log without context")
"""
# Add keys to the context
self.append_keys(**additional_keys)
try:
yield
finally:
# Remove the keys after exiting the context
self.remove_keys(additional_keys.keys())
# These specific thread-safe methods are necessary to manage shared context in concurrent environments.
# They prevent race conditions and ensure data consistency across multiple threads.
def thread_safe_append_keys(self, **additional_keys) -> None:
# Append additional key-value pairs to the context safely in a thread-safe manner.
set_context_keys(**additional_keys)
def thread_safe_get_current_keys(self) -> dict[str, Any]:
# Retrieve the current context keys safely in a thread-safe manner.
return _get_context().get()
def thread_safe_remove_keys(self, keys: Iterable[str]) -> None:
# Remove specified keys from the context safely in a thread-safe manner.
remove_context_keys(keys)
def thread_safe_clear_keys(self) -> None:
# Clear all keys from the context safely in a thread-safe manner.
clear_context_keys()
@staticmethod
def _build_default_keys() -> dict[str, str]:
return {
"level": "%(levelname)s",
"location": "%(funcName)s:%(lineno)d",
"timestamp": "%(asctime)s",
}
def _get_latest_trace_id(self) -> str | None:
xray_trace_id_key = self.log_format.get("xray_trace_id", "")
if xray_trace_id_key is None:
# key is explicitly disabled; ignore it. e.g., Logger(xray_trace_id=None)
return None
xray_trace_id = os.getenv(constants.XRAY_TRACE_ID_ENV)
return xray_trace_id.split(";")[0].replace("Root=", "") if xray_trace_id else None
def _extract_log_message(self, log_record: logging.LogRecord) -> dict[str, Any] | str | bool | Iterable:
"""Extract message from log record and attempt to JSON decode it if str
Parameters
----------
log_record : logging.LogRecord
Log record to extract message from
Returns
-------
message: dict[str, Any] | str | bool | Iterable
Extracted message
"""
message = log_record.msg
if isinstance(message, dict):
return message
if log_record.args: # logger.info("foo %s", "bar") requires formatting
return log_record.getMessage()
if isinstance(message, str): # could be a JSON string
try:
message = self.json_deserializer(message)
except (json.decoder.JSONDecodeError, TypeError, ValueError):
pass
return message
def _serialize_stacktrace(self, log_record: logging.LogRecord) -> LogStackTrace | None:
if log_record.exc_info:
exception_info: LogStackTrace = {
"type": log_record.exc_info[0].__name__, # type: ignore
"value": log_record.exc_info[1], # type: ignore
"module": log_record.exc_info[1].__class__.__module__,
"frames": [],
}
exception_info["frames"] = [
{"file": fs.filename, "line": fs.lineno, "function": fs.name, "statement": fs.line}
for fs in traceback.extract_tb(log_record.exc_info[2])
]
return exception_info
return None
def _extract_log_exception(self, log_record: logging.LogRecord) -> tuple[str, str] | tuple[None, None]:
"""Format traceback information, if available
Parameters
----------
log_record : logging.LogRecord
Log record to extract message from
Returns
-------
log_record: tuple[str, str] | tuple[None, None]
Log record with constant traceback info and exception name
"""
if log_record.exc_info:
return self.formatException(log_record.exc_info), log_record.exc_info[0].__name__ # type: ignore
return None, None
def _extract_log_keys(self, log_record: logging.LogRecord) -> dict[str, Any]:
"""Extract and parse custom and reserved log keys
Parameters
----------
log_record : logging.LogRecord
Log record to extract keys from
Returns
-------
formatted_log: dict[str, Any]
Structured log as dictionary
"""
record_dict = log_record.__dict__.copy()
record_dict["asctime"] = self.formatTime(record=log_record)
extras = {k: v for k, v in record_dict.items() if k not in RESERVED_LOG_ATTRS}
formatted_log: dict[str, Any] = {}
# Iterate over a default or existing log structure
# then replace any std log attribute e.g. '%(level)s' to 'INFO', '%(process)d to '4773'
# check if the value is a str if the key is a reserved attribute, the modulo operator only supports string
# lastly add or replace incoming keys (those added within the constructor or .structure_logs method)
for key, value in self.log_format.items():
if value and key in RESERVED_LOG_ATTRS:
if isinstance(value, str):
formatted_log[key] = value % record_dict
else:
raise ValueError(
"Logging keys that override reserved log attributes need to be type 'str', "
f"instead got '{type(value).__name__}'",
)
else:
formatted_log[key] = value
for key, value in _get_context().get().items():
if value and key in RESERVED_LOG_ATTRS:
if isinstance(value, str):
formatted_log[key] = value % record_dict
else:
raise ValueError(
"Logging keys that override reserved log attributes need to be type 'str', "
f"instead got '{type(value).__name__}'",
)
else:
formatted_log[key] = value
formatted_log.update(**extras)
return formatted_log
@staticmethod
def _strip_none_records(records: dict[str, Any]) -> dict[str, Any]:
"""Remove any key with None as value"""
return {k: v for k, v in records.items() if v is not None}
JsonFormatter = LambdaPowertoolsFormatter # alias to previous formatter
# Fetch current and future parameters from PowertoolsFormatter that should be reserved
RESERVED_FORMATTER_CUSTOM_KEYS: list[str] = inspect.getfullargspec(LambdaPowertoolsFormatter).args[1:]
# ContextVar for thread local keys
default_contextvar: dict[str, Any] = {}
THREAD_LOCAL_KEYS: ContextVar[dict[str, Any]] = ContextVar("THREAD_LOCAL_KEYS", default=default_contextvar)
def _get_context() -> ContextVar[dict[str, Any]]:
return THREAD_LOCAL_KEYS
def clear_context_keys() -> None:
_get_context().set({})
def set_context_keys(**kwargs: dict[str, Any]) -> None:
context = _get_context()
context.set({**context.get(), **kwargs})
def remove_context_keys(keys: Iterable[str]) -> None:
context = _get_context()
context_values = context.get()
for k in keys:
context_values.pop(k, None)
context.set(context_values)