Skip to content

Commit

Permalink
Add ECS support
Browse files Browse the repository at this point in the history
  • Loading branch information
andriilahuta committed Dec 25, 2023
1 parent c60580b commit 6a62035
Show file tree
Hide file tree
Showing 3 changed files with 569 additions and 59 deletions.
260 changes: 203 additions & 57 deletions logstash_async/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,40 @@
import simplejson as json


class _LogstashMessageSchema:
TIMESTAMP = '@timestamp'
VERSION = '@version'
METADATA = '@metadata'
HOST = 'host'
LOG_LEVEL = 'level'
LOG_SOURCE = 'logsource'
LOGGER_NAME = 'logger_name'
LINE = 'line'
MESSAGE = 'message'
MESSAGE_TYPE = 'type'
FUNC_NAME = 'func_name'
THREAD_NAME = 'thread_name'
PROCESS_NAME = 'process_name'
INTERPRETER = 'interpreter'
INTERPRETER_VERSION = 'interpreter_version'
PATH = 'path'
PID = 'pid'
PROGRAM = 'program'
STACK_TRACE = 'stack_trace'
ERROR_TYPE = 'error_type'
TAGS = 'tags'
LOGSTASH_ASYNC_VERSION = 'logstash_async_version'


class LogstashFormatter(logging.Formatter):

_basic_data_types = (type(None), bool, str, int, float)

formatter_record_field_skip_list = constants.FORMATTER_RECORD_FIELD_SKIP_LIST
formatter_logstash_message_field_list = constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST

MessageSchema = _LogstashMessageSchema

# ----------------------------------------------------------------------
# pylint: disable=too-many-arguments
def __init__(
Expand Down Expand Up @@ -90,21 +120,22 @@ def _prefetch_program_name(self):

# ----------------------------------------------------------------------
def format(self, record):
Schema = self.MessageSchema
message = {
'@timestamp': self._format_timestamp(record.created),
'@version': '1',
'host': self._host,
'level': record.levelname,
'logsource': self._logsource,
'message': record.getMessage(),
'pid': record.process,
'program': self._program_name,
'type': self._message_type,
Schema.TIMESTAMP: self._format_timestamp(record.created),
Schema.VERSION: '1',
Schema.HOST: self._host,
Schema.LOG_LEVEL: record.levelname,
Schema.LOG_SOURCE: self._logsource,
Schema.MESSAGE: record.getMessage(),
Schema.PID: record.process,
Schema.PROGRAM: self._program_name,
Schema.MESSAGE_TYPE: self._message_type,
}
if self._metadata:
message['@metadata'] = self._metadata
message[Schema.METADATA] = self._metadata
if self._tags:
message['tags'] = self._tags
message[Schema.TAGS] = self._tags

# record fields
record_fields = self._get_record_fields(record)
Expand All @@ -113,14 +144,11 @@ def format(self, record):
extra_fields = self._get_extra_fields(record)
# remove all fields to be excluded
self._remove_excluded_fields(message, extra_fields)
# wrap extra fields in configurable namespace
if self._extra_prefix:
message[self._extra_prefix] = extra_fields
else:
message.update(extra_fields)
message.update(extra_fields)

# move existing extra record fields into the configured prefix
self._move_extra_record_fields_to_prefix(message)
self._post_process_message(message)

return self._serialize(message)

Expand Down Expand Up @@ -152,23 +180,25 @@ def _value_repr(self, value):

# ----------------------------------------------------------------------
def _get_extra_fields(self, record):
Schema = self.MessageSchema
extra_fields = {
'func_name': record.funcName,
'interpreter': self._interpreter,
'interpreter_version': self._interpreter_version,
'line': record.lineno,
'logger_name': record.name,
'logstash_async_version': logstash_async.__version__,
'path': record.pathname,
'process_name': record.processName,
'thread_name': record.threadName,
Schema.FUNC_NAME: record.funcName,
Schema.INTERPRETER: self._interpreter,
Schema.INTERPRETER_VERSION: self._interpreter_version,
Schema.LINE: record.lineno,
Schema.LOGGER_NAME: record.name,
Schema.LOGSTASH_ASYNC_VERSION: logstash_async.__version__,
Schema.PATH: record.pathname,
Schema.PROCESS_NAME: record.processName,
Schema.THREAD_NAME: record.threadName,
}
# static extra fields
if self._extra:
extra_fields.update(self._extra)
# exceptions
if record.exc_info:
extra_fields['stack_trace'] = self._format_exception(record.exc_info)
extra_fields[Schema.ERROR_TYPE] = record.exc_info[0].__name__
extra_fields[Schema.STACK_TRACE] = self._format_exception(record.exc_info)
return extra_fields

# ----------------------------------------------------------------------
Expand All @@ -185,7 +215,7 @@ def _format_exception(self, exc_info):
def _remove_excluded_fields(self, message, extra_fields):
for fields in (message, extra_fields):
for field_name in list(fields):
if field_name in constants.FORMATTER_RECORD_FIELD_SKIP_LIST:
if field_name in self.formatter_record_field_skip_list:
del fields[field_name]

# ----------------------------------------------------------------------
Expand All @@ -199,17 +229,68 @@ def _move_extra_record_fields_to_prefix(self, message):
if not self._extra_prefix:
return # early out if no prefix is configured

field_skip_list = constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST + [self._extra_prefix]
message.setdefault(self._extra_prefix, {})
field_skip_list = self.formatter_logstash_message_field_list + [self._extra_prefix]
for key in list(message):
if key not in field_skip_list:
message[self._extra_prefix][key] = message.pop(key)

# ----------------------------------------------------------------------
def _post_process_message(self, message):
"""Override when needed"""

# ----------------------------------------------------------------------
def _serialize(self, message):
return json.dumps(message, ensure_ascii=self._ensure_ascii)


class LogstashEcsFormatter(LogstashFormatter):
ecs_version = '8.11.0'
__schema_dict = {
'ECS_VERSION': 'ecs.version',
'MESSAGE_TYPE': 'event.module',
'HOST': 'host.hostname',
'LOG_LEVEL': 'log.level',
'LOGGER_NAME': 'log.logger',
'LOG_SOURCE': 'log.syslog.hostname',
'LINE': 'log.origin.file.line',
'PATH': 'log.origin.file.name',
'FUNC_NAME': 'log.origin.function',
'STACK_TRACE': 'error.stack_trace',
'ERROR_TYPE': 'error.type',
'PROGRAM': 'process.executable',
'PROCESS_NAME': 'process.name',
'PID': 'process.pid',
'THREAD_NAME': 'process.thread.name',
}

formatter_logstash_message_field_list = (LogstashFormatter.formatter_logstash_message_field_list
+ list(__schema_dict.values()))
MessageSchema = type('MessageSchema', (LogstashFormatter.MessageSchema,), __schema_dict)

def _post_process_message(self, message):
super()._post_process_message(message)
Schema = self.MessageSchema
message[Schema.ECS_VERSION] = self.ecs_version


class DjangoLogstashFormatter(LogstashFormatter):
class MessageSchema(LogstashFormatter.MessageSchema):
DJANGO_VERSION = 'django_version'
RESP_STATUS_CODE = 'status_code'
REQ_USER_AGENT = 'req_useragent'
REQ_REMOTE_ADDRESS = 'req_remote_address'
REQ_HOST = 'req_host'
REQ_URI = 'req_uri'
REQ_USER = 'req_user'
REQ_METHOD = 'req_method'
REQ_REFERER = 'req_referer'
REQ_FORWARDED_PROTO = 'req_forwarded_proto'
REQ_FORWARDED_FOR = 'req_forwarded_for'
TMPL_NAME = 'tmpl_name'
TMPL_LINE = 'tmpl_line'
TMPL_MESSAGE = 'tmpl_message'
TMPL_DURING = 'tmpl_during'

# ----------------------------------------------------------------------
def __init__(self, *args, **kwargs):
Expand All @@ -225,9 +306,10 @@ def _fetch_django_version(self):
# ----------------------------------------------------------------------
def _get_extra_fields(self, record):
extra_fields = super()._get_extra_fields(record)
Schema = self.MessageSchema

if hasattr(record, 'status_code'):
extra_fields['status_code'] = record.status_code
extra_fields[Schema.RESP_STATUS_CODE] = record.status_code

# Django's runserver command passes socketobject and WSGIRequest instances as "request".
# Hence the check for the META attribute.
Expand All @@ -236,34 +318,34 @@ def _get_extra_fields(self, record):
request = record.request

request_user = self._get_attribute_with_default(request, 'user', '')
extra_fields['django_version'] = self._django_version
extra_fields['req_useragent'] = request.META.get('HTTP_USER_AGENT', '<none>')
extra_fields['req_remote_address'] = request.META.get('REMOTE_ADDR', '<none>')
extra_fields['req_host'] = self._try_to_get_host_from_remote(request)
extra_fields['req_uri'] = self._try_to_get_full_request_uri(request)
extra_fields['req_user'] = str(request_user)
extra_fields['req_method'] = request.META.get('REQUEST_METHOD', '')
extra_fields['req_referer'] = request.META.get('HTTP_REFERER', '')
extra_fields[Schema.DJANGO_VERSION] = self._django_version
extra_fields[Schema.REQ_USER_AGENT] = request.META.get('HTTP_USER_AGENT', '<none>')
extra_fields[Schema.REQ_REMOTE_ADDRESS] = request.META.get('REMOTE_ADDR', '<none>')
extra_fields[Schema.REQ_HOST] = self._try_to_get_host_from_remote(request)
extra_fields[Schema.REQ_URI] = self._try_to_get_full_request_uri(request)
extra_fields[Schema.REQ_USER] = str(request_user)
extra_fields[Schema.REQ_METHOD] = request.META.get('REQUEST_METHOD', '')
extra_fields[Schema.REQ_REFERER] = request.META.get('HTTP_REFERER', '')

forwarded_proto = request.META.get('HTTP_X_FORWARDED_PROTO', None)
if forwarded_proto is not None:
extra_fields['req_forwarded_proto'] = forwarded_proto
extra_fields[Schema.REQ_FORWARDED_PROTO] = forwarded_proto

forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', None)
if forwarded_for is not None:
# make it a list
forwarded_for_list = forwarded_for.replace(' ', '').split(',')
extra_fields['req_forwarded_for'] = forwarded_for_list
extra_fields[Schema.REQ_FORWARDED_FOR] = forwarded_for_list

# template debug
if isinstance(record.exc_info, tuple):
exc_value = record.exc_info[1]
template_info = getattr(exc_value, 'template_debug', None)
if template_info:
extra_fields['tmpl_name'] = template_info['name']
extra_fields['tmpl_line'] = template_info['line']
extra_fields['tmpl_message'] = template_info['message']
extra_fields['tmpl_during'] = template_info['during']
extra_fields[Schema.TMPL_NAME] = template_info['name']
extra_fields[Schema.TMPL_LINE] = template_info['line']
extra_fields[Schema.TMPL_MESSAGE] = template_info['message']
extra_fields[Schema.TMPL_DURING] = template_info['during']

return extra_fields

Expand Down Expand Up @@ -299,12 +381,49 @@ def _try_to_get_full_request_uri(self, request):
return None


class DjangoLogstashEcsFormatter(DjangoLogstashFormatter, LogstashEcsFormatter):
__schema_dict = {
'RESP_STATUS_CODE': 'http.response.status_code',
'REQ_USER_AGENT': 'user_agent.original',
'REQ_REMOTE_ADDRESS': 'client.ip',
'REQ_HOST': 'client.domain',
'REQ_URI': 'url.original',
'REQ_USER': 'user.name',
'REQ_METHOD': 'http.request.method',
'REQ_REFERER': 'http.request.referrer',
}

formatter_logstash_message_field_list = (LogstashEcsFormatter.formatter_logstash_message_field_list
+ list(__schema_dict.values()))
MessageSchema = type(
'MessageSchema',
(DjangoLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema),
__schema_dict,
)

def _remove_excluded_fields(self, message, extra_fields):
message.pop('status_code', None)
super()._remove_excluded_fields(message, extra_fields)


class FlaskLogstashFormatter(LogstashFormatter):
class MessageSchema(LogstashFormatter.MessageSchema):
FLASK_VERSION = 'flask_version'
RESP_STATUS_CODE = 'status_code'
REQ_USER_AGENT = 'req_useragent'
REQ_REMOTE_ADDRESS = 'req_remote_address'
REQ_HOST = 'req_host'
REQ_URI = 'req_uri'
REQ_USER = 'req_user'
REQ_METHOD = 'req_method'
REQ_REFERER = 'req_referer'
REQ_ID = 'request_id'
REQ_FORWARDED_PROTO = 'req_forwarded_proto'
REQ_FORWARDED_FOR = 'req_forwarded_for'

# ----------------------------------------------------------------------
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._django_version = None
self._fetch_flask_version()

# ----------------------------------------------------------------------
Expand All @@ -317,34 +436,61 @@ def _get_extra_fields(self, record):
from flask import request # pylint: disable=import-error,import-outside-toplevel

extra_fields = super()._get_extra_fields(record)
Schema = self.MessageSchema

extra_fields['flask_version'] = self._flask_version
extra_fields[Schema.FLASK_VERSION] = self._flask_version
if request: # request might be unbound in other threads
extra_fields['req_useragent'] = str(request.user_agent) if request.user_agent else ''
extra_fields['req_remote_address'] = request.remote_addr
extra_fields['req_host'] = request.host.split(':', 1)[0]
extra_fields['req_uri'] = request.url
extra_fields['req_method'] = request.method
extra_fields['req_referer'] = request.referrer
extra_fields[Schema.REQ_USER_AGENT] = str(request.user_agent) if request.user_agent else ''
extra_fields[Schema.REQ_REMOTE_ADDRESS] = request.remote_addr
extra_fields[Schema.REQ_HOST] = request.host.split(':', 1)[0]
extra_fields[Schema.REQ_URI] = request.url
extra_fields[Schema.REQ_METHOD] = request.method
extra_fields[Schema.REQ_REFERER] = request.referrer
if 'X-Request-ID' in request.headers:
extra_fields['request_id'] = request.headers.get('X-Request-ID')
extra_fields[Schema.REQ_ID] = request.headers.get('X-Request-ID')
if request.remote_user:
extra_fields['req_user'] = request.remote_user
extra_fields[Schema.REQ_USER] = request.remote_user

forwarded_proto = request.headers.get('X-Forwarded-Proto', None)
if forwarded_proto is not None:
extra_fields['req_forwarded_proto'] = forwarded_proto
extra_fields[Schema.REQ_FORWARDED_PROTO] = forwarded_proto

forwarded_for = request.headers.get('X-Forwarded-For', None)
if forwarded_for is not None:
# make it a list
forwarded_for_list = forwarded_for.replace(' ', '').split(',')
extra_fields['req_forwarded_for'] = forwarded_for_list
extra_fields[Schema.REQ_FORWARDED_FOR] = forwarded_for_list

# check if we have a status code somewhere
if hasattr(record, 'status_code'):
extra_fields['status_code'] = record.status_code
extra_fields[Schema.RESP_STATUS_CODE] = record.status_code
if hasattr(record, 'response'):
extra_fields['status_code'] = record.response.status_code
extra_fields[Schema.RESP_STATUS_CODE] = record.response.status_code

return extra_fields


class FlaskLogstashEcsFormatter(FlaskLogstashFormatter, LogstashEcsFormatter):
__schema_dict = {
'RESP_STATUS_CODE': 'http.response.status_code',
'REQ_USER_AGENT': 'user_agent.original',
'REQ_REMOTE_ADDRESS': 'client.ip',
'REQ_HOST': 'client.domain',
'REQ_URI': 'url.original',
'REQ_USER': 'user.name',
'REQ_METHOD': 'http.request.method',
'REQ_REFERER': 'http.request.referrer',
'REQ_ID': 'http.request.id',
}

formatter_logstash_message_field_list = (LogstashEcsFormatter.formatter_logstash_message_field_list
+ list(__schema_dict.values()))
MessageSchema = type(
'MessageSchema',
(FlaskLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema),
__schema_dict,
)

def _remove_excluded_fields(self, message, extra_fields):
message.pop('status_code', None)
super()._remove_excluded_fields(message, extra_fields)
Loading

0 comments on commit 6a62035

Please sign in to comment.