Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix replication metrics when using redis #7325

Merged
merged 4 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7325.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for running replication over Redis when using workers.
48 changes: 12 additions & 36 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@
import fcntl
import logging
import struct
from collections import defaultdict
from typing import TYPE_CHECKING, DefaultDict, List

from six import iteritems
from typing import TYPE_CHECKING, List

from prometheus_client import Counter

Expand Down Expand Up @@ -86,6 +83,14 @@
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]
)

tcp_inbound_commands = Counter(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't mind if these had a name which reflected that they were counters, now that they're becoming part of the public interface to this module. In other words: can you rename them to tcp_*_commands_counter ?

(I'll also remind you that the second field is meant to be a description, but happy if you want to leave that for now)

"synapse_replication_tcp_protocol_inbound_commands", "", ["command", "name"],
)

tcp_outbound_commands = Counter(
"synapse_replication_tcp_protocol_outbound_commands", "", ["command", "name"],
)

# A list of all connected protocols. This allows us to send metrics about the
# connections.
connected_connections = []
Expand Down Expand Up @@ -151,9 +156,6 @@ def __init__(self, clock: Clock, handler: "ReplicationCommandHandler"):
# The LoopingCall for sending pings.
self._send_ping_loop = None

self.inbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
self.outbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]

def connectionMade(self):
logger.info("[%s] Connection established", self.id())

Expand Down Expand Up @@ -224,9 +226,7 @@ def lineReceived(self, line: bytes):

self.last_received_command = self.clock.time_msec()

self.inbound_commands_counter[cmd.NAME] = (
self.inbound_commands_counter[cmd.NAME] + 1
)
tcp_inbound_commands.labels(cmd.NAME, self.name).inc()

# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
Expand Down Expand Up @@ -292,9 +292,8 @@ def send_command(self, cmd, do_buffer=True):
self._queue_command(cmd)
return

self.outbound_commands_counter[cmd.NAME] = (
self.outbound_commands_counter[cmd.NAME] + 1
)
tcp_outbound_commands.labels(cmd.NAME, self.name).inc()

string = "%s %s" % (cmd.NAME, cmd.to_line())
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
Expand Down Expand Up @@ -546,26 +545,3 @@ def transport_kernel_read_buffer_size(protocol, read=True):
for p in connected_connections
},
)


tcp_inbound_commands = LaterGauge(
"synapse_replication_tcp_protocol_inbound_commands",
"",
["command", "name"],
lambda: {
(k, p.name): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter)
},
)

tcp_outbound_commands = LaterGauge(
"synapse_replication_tcp_protocol_outbound_commands",
"",
["command", "name"],
lambda: {
(k, p.name): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter)
},
)
14 changes: 13 additions & 1 deletion synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
ReplicateCommand,
parse_command_from_line,
)
from synapse.replication.tcp.protocol import AbstractConnection
from synapse.replication.tcp.protocol import (
AbstractConnection,
tcp_inbound_commands,
tcp_outbound_commands,
)

if TYPE_CHECKING:
from synapse.replication.tcp.handler import ReplicationCommandHandler
Expand Down Expand Up @@ -79,6 +83,10 @@ def messageReceived(self, pattern: str, channel: str, message: str):
)
return

# We use "redis" as the name here as we don't have 1:1 connections to
# remote instances.
tcp_inbound_commands.labels(cmd.NAME, "redis").inc()

# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
Expand Down Expand Up @@ -126,6 +134,10 @@ def send_command(self, cmd: Command):

encoded_string = string.encode("utf-8")

# We use "redis" as the name here as we don't have 1:1 connections to
# remote instances.
tcp_outbound_commands.labels(cmd.NAME, "redis").inc()

async def _send():
with PreserveLoggingContext():
# Note that we use the other connection as we can't send
Expand Down