Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix replication metrics when using redis #7325

Merged
merged 4 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7325.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for running replication over Redis when using workers.
52 changes: 16 additions & 36 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@
import fcntl
import logging
import struct
from collections import defaultdict
from typing import TYPE_CHECKING, DefaultDict, List

from six import iteritems
from typing import TYPE_CHECKING, List

from prometheus_client import Counter

Expand Down Expand Up @@ -86,6 +83,18 @@
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]
)

tcp_inbound_commands_counter = Counter(
"synapse_replication_tcp_protocol_inbound_commands",
"Number of commands received from replication, by command and name of process connected to",
["command", "name"],
)

tcp_outbound_commands_counter = Counter(
"synapse_replication_tcp_protocol_outbound_commands",
"Number of commands sent to replication, by command and name of process connected to",
["command", "name"],
)

# A list of all connected protocols. This allows us to send metrics about the
# connections.
connected_connections = []
Expand Down Expand Up @@ -151,9 +160,6 @@ def __init__(self, clock: Clock, handler: "ReplicationCommandHandler"):
# The LoopingCall for sending pings.
self._send_ping_loop = None

self.inbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
self.outbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]

def connectionMade(self):
logger.info("[%s] Connection established", self.id())

Expand Down Expand Up @@ -224,9 +230,7 @@ def lineReceived(self, line: bytes):

self.last_received_command = self.clock.time_msec()

self.inbound_commands_counter[cmd.NAME] = (
self.inbound_commands_counter[cmd.NAME] + 1
)
tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc()

# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
Expand Down Expand Up @@ -292,9 +296,8 @@ def send_command(self, cmd, do_buffer=True):
self._queue_command(cmd)
return

self.outbound_commands_counter[cmd.NAME] = (
self.outbound_commands_counter[cmd.NAME] + 1
)
tcp_outbound_commands_counter.labels(cmd.NAME, self.name).inc()

string = "%s %s" % (cmd.NAME, cmd.to_line())
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
Expand Down Expand Up @@ -546,26 +549,3 @@ def transport_kernel_read_buffer_size(protocol, read=True):
for p in connected_connections
},
)


tcp_inbound_commands = LaterGauge(
"synapse_replication_tcp_protocol_inbound_commands",
"",
["command", "name"],
lambda: {
(k, p.name): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter)
},
)

tcp_outbound_commands = LaterGauge(
"synapse_replication_tcp_protocol_outbound_commands",
"",
["command", "name"],
lambda: {
(k, p.name): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter)
},
)
14 changes: 13 additions & 1 deletion synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
ReplicateCommand,
parse_command_from_line,
)
from synapse.replication.tcp.protocol import AbstractConnection
from synapse.replication.tcp.protocol import (
AbstractConnection,
tcp_inbound_commands_counter,
tcp_outbound_commands_counter,
)

if TYPE_CHECKING:
from synapse.replication.tcp.handler import ReplicationCommandHandler
Expand Down Expand Up @@ -79,6 +83,10 @@ def messageReceived(self, pattern: str, channel: str, message: str):
)
return

# We use "redis" as the name here as we don't have 1:1 connections to
# remote instances.
tcp_inbound_commands_counter.labels(cmd.NAME, "redis").inc()

# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
Expand Down Expand Up @@ -126,6 +134,10 @@ def send_command(self, cmd: Command):

encoded_string = string.encode("utf-8")

# We use "redis" as the name here as we don't have 1:1 connections to
# remote instances.
tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()

async def _send():
with PreserveLoggingContext():
# Note that we use the other connection as we can't send
Expand Down