Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve the performance of structured logging #6322

Merged
merged 10 commits into from
Nov 25, 2019
1 change: 1 addition & 0 deletions changelog.d/6322.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve the performance of outputting structured logging.
14 changes: 13 additions & 1 deletion synapse/logging/_structured.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,18 @@ def parse_drain_configs(
)


class StoppableLogPublisher(LogPublisher):
"""
A log publisher that can tell its observers to shut down any external
communications.
"""

def stop(self):
for obs in self._observers:
if hasattr(obs, "stop"):
obs.stop()


def setup_structured_logging(
hs,
config,
Expand Down Expand Up @@ -336,7 +348,7 @@ def setup_structured_logging(
# We should never get here, but, just in case, throw an error.
raise ConfigError("%s drain type cannot be configured" % (observer.type,))

publisher = LogPublisher(*observers)
publisher = StoppableLogPublisher(*observers)
log_filter = LogLevelFilterPredicate()

for namespace, namespace_config in log_config.get(
Expand Down
106 changes: 77 additions & 29 deletions synapse/logging/_terse_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,29 @@
Log formatters that output terse JSON.
"""

import json
import sys
import traceback
from collections import deque
from ipaddress import IPv4Address, IPv6Address, ip_address
from math import floor
from typing import IO
from typing import IO, Optional

import attr
from simplejson import dumps
from zope.interface import implementer

from twisted.application.internet import ClientService
from twisted.internet.defer import Deferred
from twisted.internet.endpoints import (
HostnameEndpoint,
TCP4ClientEndpoint,
TCP6ClientEndpoint,
)
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol
from twisted.logger import FileLogObserver, ILogObserver, Logger
from twisted.python.failure import Failure

_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))


def flatten_event(event: dict, metadata: dict, include_time: bool = False):
Expand Down Expand Up @@ -141,11 +145,49 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb

def formatEvent(_event: dict) -> str:
flattened = flatten_event(_event, metadata)
return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n"
return _encoder.encode(flattened) + "\n"

return FileLogObserver(outFile, formatEvent)


@attr.s
@implementer(IPushProducer)
class LogProducer(object):
"""
An IPushProducer that writes logs from its buffer to its transport when it
is resumed.

Args:
buffer: Log buffer to read logs from.
transport: Transport to write to.
"""

transport = attr.ib(type=ITransport)
_buffer = attr.ib(type=deque)
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
_paused = attr.ib(default=False, type=bool, init=False)

def pauseProducing(self):
self._paused = True

def stopProducing(self):
self._paused = True
self._buffer = None

def resumeProducing(self):
self._paused = False

while self._paused is False and (self._buffer and self.transport.connected):
try:
event = self._buffer.popleft()
self.transport.write(_encoder.encode(event).encode("utf8"))
self.transport.write(b"\n")
except Exception:
# Something has gone wrong writing to the transport -- log it
# and break out of the while.
traceback.print_exc(file=sys.__stderr__)
break


@attr.s
@implementer(ILogObserver)
class TerseJSONToTCPLogObserver(object):
Expand All @@ -165,8 +207,9 @@ class TerseJSONToTCPLogObserver(object):
metadata = attr.ib(type=dict)
maximum_buffer = attr.ib(type=int)
_buffer = attr.ib(default=attr.Factory(deque), type=deque)
_writer = attr.ib(default=None)
_connection_waiter = attr.ib(default=None, type=Optional[Deferred])
_logger = attr.ib(default=attr.Factory(Logger))
_producer = attr.ib(default=None, type=Optional[LogProducer])

def start(self) -> None:

Expand All @@ -187,38 +230,43 @@ def start(self) -> None:
factory = Factory.forProtocol(Protocol)
self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
self._service.startService()
self._connect()

def _write_loop(self) -> None:
def stop(self):
self._service.stopService()

def _connect(self) -> None:
"""
Implement the write loop.
Triggers an attempt to connect then write to the remote if not already writing.
"""
if self._writer:
if self._connection_waiter:
return

self._writer = self._service.whenConnected()
self._connection_waiter = self._service.whenConnected(failAfterFailures=1)

@self._connection_waiter.addErrback
def fail(r):
r.printTraceback(file=sys.__stderr__)
self._connection_waiter = None
self._connect()

@self._writer.addBoth
@self._connection_waiter.addCallback
def writer(r):
if isinstance(r, Failure):
r.printTraceback(file=sys.__stderr__)
self._writer = None
self.hs.get_reactor().callLater(1, self._write_loop)
# We have a connection. If we already have a producer, and its
# transport is the same, just trigger a resumeProducing.
if self._producer and r.transport is self._producer.transport:
self._producer.resumeProducing()
return

try:
for event in self._buffer:
r.transport.write(
dumps(event, ensure_ascii=False, separators=(",", ":")).encode(
"utf8"
)
)
r.transport.write(b"\n")
self._buffer.clear()
except Exception as e:
sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),))

self._writer = False
self.hs.get_reactor().callLater(1, self._write_loop)
# If the producer is still producing, stop it.
if self._producer:
self._producer.stopProducing()

# Make a new producer and start it.
self._producer = LogProducer(buffer=self._buffer, transport=r.transport)
r.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._connection_waiter = None

def _handle_pressure(self) -> None:
"""
Expand Down Expand Up @@ -277,4 +325,4 @@ def __call__(self, event: dict) -> None:
self._logger.failure("Failed clearing backpressure")

# Try and write immediately.
self._write_loop()
self._connect()
2 changes: 2 additions & 0 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ class FakeTransport(object):

disconnecting = False
disconnected = False
connected = True
buffer = attr.ib(default=b"")
producer = attr.ib(default=None)
autoflush = attr.ib(default=True)
Expand All @@ -402,6 +403,7 @@ def loseConnection(self, reason=None):
"FakeTransport: Delaying disconnect until buffer is flushed"
)
else:
self.connected = False
self.disconnected = True

def abortConnection(self):
Expand Down