From 8a40c3d3e0e282837d1946297f8be6d1dbdab614 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Tue, 5 Nov 2019 05:22:42 +1100 Subject: [PATCH 1/7] bring over logging improvements --- synapse/logging/_structured.py | 9 +++- synapse/logging/_terse_json.py | 75 +++++++++++++++++++++++++--------- tests/server.py | 2 + 3 files changed, 66 insertions(+), 20 deletions(-) diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py index 334ddaf39a56..55c0adbb7a78 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py @@ -261,6 +261,13 @@ def parse_drain_configs( ) +class StoppableLogPublisher(LogPublisher): + def stop(self): + for obs in self._observers: + if hasattr(obs, "stop"): + obs.stop() + + def setup_structured_logging( hs, config, @@ -336,7 +343,7 @@ def setup_structured_logging( # We should never get here, but, just in case, throw an error. raise ConfigError("%s drain type cannot be configured" % (observer.type,)) - publisher = LogPublisher(*observers) + publisher = StoppableLogPublisher(*observers) log_filter = LogLevelFilterPredicate() for namespace, namespace_config in log_config.get( diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 0ebbde06f217..b8e8ea500350 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -17,6 +17,7 @@ Log formatters that output terse JSON. """ +import json import sys from collections import deque from ipaddress import IPv4Address, IPv6Address, ip_address @@ -24,7 +25,6 @@ from typing import IO import attr -from simplejson import dumps from zope.interface import implementer from twisted.application.internet import ClientService @@ -33,9 +33,11 @@ TCP4ClientEndpoint, TCP6ClientEndpoint, ) +from twisted.internet.interfaces import IPushProducer from twisted.internet.protocol import Factory, Protocol from twisted.logger import FileLogObserver, ILogObserver, Logger -from twisted.python.failure import Failure + +_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":")) def flatten_event(event: dict, metadata: dict, include_time: bool = False): @@ -141,11 +143,40 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb def formatEvent(_event: dict) -> str: flattened = flatten_event(_event, metadata) - return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n" + return _encoder.encode(flattened) + "\n" return FileLogObserver(outFile, formatEvent) +@attr.s +@implementer(IPushProducer) +class LogProducer(object): + + _buffer = attr.ib() + _transport = attr.ib() + paused = attr.ib(default=False) + + def pauseProducing(self): + self.paused = True + + def stopProducing(self): + self.paused = True + self._buffer = None + + def resumeProducing(self): + self.paused = False + + while self.paused is False and (self._buffer and self._transport.connected): + try: + event = self._buffer.popleft() + self._transport.write(_encoder.encode(event).encode("utf8")) + self._transport.write(b"\n") + except Exception: + import traceback + + traceback.print_exc(file=sys.__stderr__) + + @attr.s @implementer(ILogObserver) class TerseJSONToTCPLogObserver(object): @@ -167,6 +198,7 @@ class TerseJSONToTCPLogObserver(object): _buffer = attr.ib(default=attr.Factory(deque), type=deque) _writer = attr.ib(default=None) _logger = attr.ib(default=attr.Factory(Logger)) + _producer = attr.ib(default=None) def start(self) -> None: @@ -188,6 +220,9 @@ def start(self) -> None: self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) self._service.startService() + def stop(self): + self._service.stopService() + def _write_loop(self) -> None: """ Implement the write loop. @@ -195,27 +230,29 @@ def _write_loop(self) -> None: if self._writer: return + if self._producer and self._producer._transport.connected: + self._producer.resumeProducing() + return + self._writer = self._service.whenConnected() - @self._writer.addBoth + @self._writer.addErrback + def fail(r): + r.printTraceback(file=sys.__stderr__) + self._writer = None + self.hs.get_reactor().callLater(1, self._write_loop) + return + + @self._writer.addCallback def writer(r): - if isinstance(r, Failure): - r.printTraceback(file=sys.__stderr__) - self._writer = None + def connectionLost(_self, reason): + self._producer.pauseProducing() + self._producer = None self.hs.get_reactor().callLater(1, self._write_loop) - return - try: - for event in self._buffer: - r.transport.write( - dumps(event, ensure_ascii=False, separators=(",", ":")).encode( - "utf8" - ) - ) - r.transport.write(b"\n") - self._buffer.clear() - except Exception as e: - sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),)) + self._producer = LogProducer(self._buffer, r.transport) + r.transport.registerProducer(self._producer, True) + self._producer.resumeProducing() self._writer = False self.hs.get_reactor().callLater(1, self._write_loop) diff --git a/tests/server.py b/tests/server.py index f878aeaada35..2b7cf4242e4c 100644 --- a/tests/server.py +++ b/tests/server.py @@ -379,6 +379,7 @@ class FakeTransport(object): disconnecting = False disconnected = False + connected = True buffer = attr.ib(default=b"") producer = attr.ib(default=None) autoflush = attr.ib(default=True) @@ -402,6 +403,7 @@ def loseConnection(self, reason=None): "FakeTransport: Delaying disconnect until buffer is flushed" ) else: + self.connected = False self.disconnected = True def abortConnection(self): From aa48af2196817c66a648b64a994fa3fed9dec4cd Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Tue, 5 Nov 2019 05:40:13 +1100 Subject: [PATCH 2/7] more cleanups --- synapse/logging/_structured.py | 5 ++++ synapse/logging/_terse_json.py | 44 +++++++++++++++++++--------------- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py index 55c0adbb7a78..ffa7b20ca863 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py @@ -262,6 +262,11 @@ def parse_drain_configs( class StoppableLogPublisher(LogPublisher): + """ + A log publisher that can tell its observers to shut down any external + communications. + """ + def stop(self): for obs in self._observers: if hasattr(obs, "stop"): diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index b8e8ea500350..878bfe2f938d 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -19,21 +19,23 @@ import json import sys +import traceback from collections import deque from ipaddress import IPv4Address, IPv6Address, ip_address from math import floor -from typing import IO +from typing import IO, Optional import attr from zope.interface import implementer from twisted.application.internet import ClientService +from twisted.internet import Deferred from twisted.internet.endpoints import ( HostnameEndpoint, TCP4ClientEndpoint, TCP6ClientEndpoint, ) -from twisted.internet.interfaces import IPushProducer +from twisted.internet.interfaces import IPushProducer, ITransport from twisted.internet.protocol import Factory, Protocol from twisted.logger import FileLogObserver, ILogObserver, Logger @@ -151,30 +153,39 @@ def formatEvent(_event: dict) -> str: @attr.s @implementer(IPushProducer) class LogProducer(object): + """ + An IPushProducer that writes logs from its buffer to its transport when it + is resumed. + + Args: + buffer: Log buffer to read logs from. + transport: Transport to write to. + """ - _buffer = attr.ib() - _transport = attr.ib() - paused = attr.ib(default=False) + _buffer = attr.ib(type=deque) + _transport = attr.ib(type=ITransport) + _paused = attr.ib(default=False, type=bool, init=False) def pauseProducing(self): - self.paused = True + self._paused = True def stopProducing(self): - self.paused = True + self._paused = True self._buffer = None def resumeProducing(self): - self.paused = False + self._paused = False - while self.paused is False and (self._buffer and self._transport.connected): + while self._paused is False and (self._buffer and self._transport.connected): try: event = self._buffer.popleft() self._transport.write(_encoder.encode(event).encode("utf8")) self._transport.write(b"\n") except Exception: - import traceback - + # Something has gone wrong writing to the transport -- log it + # and break out of the while. traceback.print_exc(file=sys.__stderr__) + break @attr.s @@ -196,9 +207,9 @@ class TerseJSONToTCPLogObserver(object): metadata = attr.ib(type=dict) maximum_buffer = attr.ib(type=int) _buffer = attr.ib(default=attr.Factory(deque), type=deque) - _writer = attr.ib(default=None) + _writer = attr.ib(default=None, type=Optional[Deferred]) _logger = attr.ib(default=attr.Factory(Logger)) - _producer = attr.ib(default=None) + _producer = attr.ib(default=None, type=Optional[LogProducer]) def start(self) -> None: @@ -245,16 +256,11 @@ def fail(r): @self._writer.addCallback def writer(r): - def connectionLost(_self, reason): - self._producer.pauseProducing() - self._producer = None - self.hs.get_reactor().callLater(1, self._write_loop) - self._producer = LogProducer(self._buffer, r.transport) r.transport.registerProducer(self._producer, True) self._producer.resumeProducing() - self._writer = False + self._writer = None self.hs.get_reactor().callLater(1, self._write_loop) def _handle_pressure(self) -> None: From cfd06a2d968effa87b8f4fd4a61c1de2110d08d8 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Tue, 5 Nov 2019 05:40:43 +1100 Subject: [PATCH 3/7] changelog --- changelog.d/6322.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6322.misc diff --git a/changelog.d/6322.misc b/changelog.d/6322.misc new file mode 100644 index 000000000000..70ef36ca806b --- /dev/null +++ b/changelog.d/6322.misc @@ -0,0 +1 @@ +Improve the performance of outputting structured logging. From aa34532ecdc66accbe8bb566f1f806721624fcdc Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Tue, 5 Nov 2019 10:02:03 +1100 Subject: [PATCH 4/7] fix --- synapse/logging/_terse_json.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 878bfe2f938d..0bcb4bcd5f6d 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -29,7 +29,7 @@ from zope.interface import implementer from twisted.application.internet import ClientService -from twisted.internet import Deferred +from twisted.internet.defer import Deferred from twisted.internet.endpoints import ( HostnameEndpoint, TCP4ClientEndpoint, From 3a8ad2c86d238aacb92c2158fb46185ce530feb6 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Fri, 15 Nov 2019 02:27:08 +1100 Subject: [PATCH 5/7] remove the requirement for a loop --- synapse/logging/_terse_json.py | 50 +++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 6164b3283368..3a1975ee39c1 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -37,6 +37,7 @@ ) from twisted.internet.interfaces import IPushProducer, ITransport from twisted.internet.protocol import Factory, Protocol +from twisted.internet.task import LoopingCall from twisted.logger import FileLogObserver, ILogObserver, Logger _encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":")) @@ -162,8 +163,8 @@ class LogProducer(object): transport: Transport to write to. """ + transport = attr.ib(type=ITransport) _buffer = attr.ib(type=deque) - _transport = attr.ib(type=ITransport) _paused = attr.ib(default=False, type=bool, init=False) def pauseProducing(self): @@ -176,11 +177,11 @@ def stopProducing(self): def resumeProducing(self): self._paused = False - while self._paused is False and (self._buffer and self._transport.connected): + while self._paused is False and (self._buffer and self.transport.connected): try: event = self._buffer.popleft() - self._transport.write(_encoder.encode(event).encode("utf8")) - self._transport.write(b"\n") + self.transport.write(_encoder.encode(event).encode("utf8")) + self.transport.write(b"\n") except Exception: # Something has gone wrong writing to the transport -- log it # and break out of the while. @@ -207,7 +208,7 @@ class TerseJSONToTCPLogObserver(object): metadata = attr.ib(type=dict) maximum_buffer = attr.ib(type=int) _buffer = attr.ib(default=attr.Factory(deque), type=deque) - _writer = attr.ib(default=None, type=Optional[Deferred]) + _connection_waiter = attr.ib(default=None, type=Optional[Deferred]) _logger = attr.ib(default=attr.Factory(Logger)) _producer = attr.ib(default=None, type=Optional[LogProducer]) @@ -230,38 +231,43 @@ def start(self) -> None: factory = Factory.forProtocol(Protocol) self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) self._service.startService() + self._connect() def stop(self): self._service.stopService() - def _write_loop(self) -> None: + def _connect(self) -> None: """ Implement the write loop. """ - if self._writer: + if self._connection_waiter: return - if self._producer and self._producer._transport.connected: - self._producer.resumeProducing() - return - - self._writer = self._service.whenConnected() + self._connection_waiter = self._service.whenConnected(failAfterFailures=1) - @self._writer.addErrback + @self._connection_waiter.addErrback def fail(r): r.printTraceback(file=sys.__stderr__) - self._writer = None - self.hs.get_reactor().callLater(1, self._write_loop) - return + self._connection_waiter = None + self._connect() - @self._writer.addCallback + @self._connection_waiter.addCallback def writer(r): - self._producer = LogProducer(self._buffer, r.transport) + # We have a connection. If we already have a producer, and its + # transport is the same, just trigger a resumeProducing. + if self._producer and r.transport is self._producer.transport: + self._producer.resumeProducing() + return + + # If the producer is still producing, stop it. + if self._producer: + self._producer.stopProducing() + + # Make a new producer and start it. + self._producer = LogProducer(buffer=self._buffer, transport=r.transport) r.transport.registerProducer(self._producer, True) self._producer.resumeProducing() - - self._writer = None - self.hs.get_reactor().callLater(1, self._write_loop) + self._connection_waiter = None def _handle_pressure(self) -> None: """ @@ -320,4 +326,4 @@ def __call__(self, event: dict) -> None: self._logger.failure("Failed clearing backpressure") # Try and write immediately. - self._write_loop() + self._connect() From 465ad02a53eccfffaf469b7ca17803b45b9d8da8 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Fri, 15 Nov 2019 02:46:29 +1100 Subject: [PATCH 6/7] remove the requirement for a loop --- synapse/logging/_terse_json.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 3a1975ee39c1..4e7edaeab31d 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -37,7 +37,6 @@ ) from twisted.internet.interfaces import IPushProducer, ITransport from twisted.internet.protocol import Factory, Protocol -from twisted.internet.task import LoopingCall from twisted.logger import FileLogObserver, ILogObserver, Logger _encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":")) From a6bb7bbd34974573fee55eeabdecb98ccc45e908 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 26 Nov 2019 03:03:15 +1100 Subject: [PATCH 7/7] Update synapse/logging/_terse_json.py Co-Authored-By: Erik Johnston --- synapse/logging/_terse_json.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 4e7edaeab31d..05fc64f409f3 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -237,7 +237,7 @@ def stop(self): def _connect(self) -> None: """ - Implement the write loop. + Triggers an attempt to connect then write to the remote if not already writing. """ if self._connection_waiter: return