-
Notifications
You must be signed in to change notification settings - Fork 4
/
loki_logger.py
172 lines (141 loc) · 6.08 KB
/
loki_logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
# Original LokiEmitter/LokiHandler implementation from
# https://github.com/GreyZmeem/python-logging-loki (MIT licensed)
"""Loki logger."""
import copy
import functools
import json
import logging
import string
import time
import urllib.error
from logging.config import ConvertingDict
from typing import Any, Dict, Optional, Tuple, cast
from urllib import request
logger = logging.getLogger("loki-logger")
# prevent infinite recursion because on failure urllib3 will push more logs
# https://github.com/GreyZmeem/python-logging-loki/issues/18
logging.getLogger("urllib3").setLevel(logging.INFO)
class LokiEmitter:
"""Base Loki emitter class."""
#: Success HTTP status code from Loki API.
success_response_code: int = 204
#: Label name indicating logging level.
level_label: str = "severity"
#: Label name indicating logger name.
logger_label: str = "logger"
#: String contains chars that can be used in label names in LogQL.
label_allowed_chars: str = "".join((string.ascii_letters, string.digits, "_"))
#: A list of pairs of characters to replace in the label name.
label_replace_with: Tuple[Tuple[str, str], ...] = (
("'", ""),
('"', ""),
(" ", "_"),
(".", "_"),
("-", "_"),
)
def __init__(
self, url: str, labels: Optional[Dict[str, str]] = None, cert: Optional[str] = None
):
"""Create new Loki emitter.
Arguments:
url: Endpoint used to send log entries to Loki (e.g.
`https://my-loki-instance/loki/api/v1/push`).
labels: Default labels added to every log record.
cert: Absolute path to a ca cert for TLS authentication.
"""
#: Tags that will be added to all records handled by this handler.
self.labels: Dict[str, str] = labels or {}
#: Loki JSON push endpoint (e.g `http://127.0.0.1/loki/api/v1/push`)
self.url = url
#: Optional cert for TLS auth
self.cert = cert
#: only notify once on push failure, to avoid spamming error logs
self._error_notified_once = False
def _send_request(self, req: request.Request, jsondata_encoded: bytes):
return request.urlopen(req, jsondata_encoded, capath=self.cert)
def __call__(self, record: logging.LogRecord, line: str):
"""Send log record to Loki."""
payload = self.build_payload(record, line)
req = request.Request(self.url, method="POST")
req.add_header("Content-Type", "application/json; charset=utf-8")
jsondata_encoded = json.dumps(payload).encode("utf-8")
try:
resp = self._send_request(req, jsondata_encoded)
except urllib.error.HTTPError as e:
if not self._error_notified_once:
logger.error(f"error pushing logs to {self.url}: {e.status, e.reason}") # type: ignore
self._error_notified_once = True
return
if resp.getcode() != self.success_response_code:
raise ValueError(
"Unexpected Loki API response status code: {0}".format(resp.status_code)
)
def build_payload(self, record: logging.LogRecord, line: str) -> Dict[str, Any]:
"""Build JSON payload with a log entry."""
labels = self.build_labels(record)
ns = 1e9
ts = str(int(time.time() * ns))
stream = {
"stream": labels,
"values": [[ts, line]],
}
return {"streams": [stream]}
@functools.lru_cache(256)
def format_label(self, label: str) -> str:
"""Build label to match prometheus format.
`Label format <https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels>`_
"""
for char_from, char_to in self.label_replace_with:
label = label.replace(char_from, char_to)
return "".join(char for char in label if char in self.label_allowed_chars)
def build_labels(self, record: logging.LogRecord) -> Dict[str, str]:
"""Return labels that must be sent to Loki with a log record."""
labels: Dict[str, str] = (
dict(self.labels) if isinstance(self.labels, ConvertingDict) else self.labels
)
labels = cast(Dict[str, Any], copy.deepcopy(labels))
labels[self.level_label] = record.levelname.lower()
labels[self.logger_label] = record.name
# if the user implemented a logrecord subclass with a .labels attributes, attempt to
# respect it and add those labels on top of those registered on the LokiEmitter class.
extra_labels: Any = getattr(record, "labels", {})
if not isinstance(extra_labels, dict):
return labels
label_name: Any
label_value: Any
for label_name, label_value in extra_labels.items():
if not isinstance(label_name, str) or not isinstance(label_value, str):
return labels
cleared_name = self.format_label(label_name)
if cleared_name:
labels[cleared_name] = label_value
return labels
class LokiHandler(logging.Handler):
"""Log handler that sends log records to Loki.
`Loki API <https://github.com/grafana/loki/blob/master/docs/api.md>` # wokeignore:rule=master
"""
def __init__(
self,
url: str,
labels: Optional[Dict[str, str]] = None,
# username, password tuple
cert: Optional[str] = None,
):
"""Create new Loki logging handler.
Arguments:
url: Endpoint used to send log entries to Loki (e.g.
`https://my-loki-instance/loki/api/v1/push`).
labels: Default labels added to every log record.
cert: Optional absolute path to cert file for TLS auth.
"""
super().__init__()
self.emitter = LokiEmitter(url, labels, cert)
def emit(self, record: logging.LogRecord):
"""Send log record to Loki."""
# noinspection PyBroadException
try:
self.emitter(record, self.format(record))
except Exception:
self.handleError(record)