Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Replace HTTP replication with TCP replication #2069

Closed
wants to merge 11 commits into from

Conversation

erikjohnston
Copy link
Member

@erikjohnston erikjohnston commented Mar 27, 2017

Apologies for the size of this PR, I hope however that each individual commit will be manageable and reviewable. A large portion of this PR should also be documentation of what's going on, as opposed to code.

In short, this PR replaces the HTTP long-polled based replication with a custom TCP protocol. The reasons why this isn't completely insane are that a TCP protocol better fits with what we're trying to do:

  • everything is fire and forget, nothing is request/response, nothing requires reliability (other than the streams themselves, which have in built reliability due to the tokens)
  • the master wants to be able to poke each worker at the same time when it has updates to send, but currently the master has to wait for the workers to request the data, making it much harder to not duplicate work across all workers

The major things that we need to be careful of when creating a TCP protocol:

  • Ensuring we quickly detect when the other side has gone away, this is done via periodic keep alives
  • Handling upstream congestion, if the remote can't keep up twisted will buffer bytes in memory. This can be handled by ensuring that we knife the connection if the buffers become too large.
  • Making tcp reconnects relatively seamless, in particular its important that the client can reconnect quickly to the server and fetch any updates to streams it missed. (Slight) back off and retries are important here.

We also use a different philosophy with what and how we send the updates. The HTTP API was designed to replicate entire "db rows" so that a new database could theoretically be constructed from the various streams, it was also self describing, i.e. we sent the column names for each stream we sent.
Conversely, the TCP protocol aims to send only the data that is needed, and usually that is just enough to know which caches to invalidate. The format of the updates are also not self describing, they are simply rows defined in synapse (as synapse only talks to itself, there is little reason to make the updates self describing)

An example exchange:

    * connection established *
    > SERVER localhost:8823
    > PING 1490197665618
    < NAME synapse.app.appservice
    < PING 1490197665618
    < REPLICATE events 1
    < REPLICATE caches 1
    > POSITION events 1
    > POSITION caches 1
    > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
    > RDATA events 14 ["$149019767112vOHxz:localhost:8823", "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
    < PING 1490197675618
    > ERROR server stopping
    * connection closed by server *

(where the arrows indicate the direction of the data being sent, and are not part of the protocol)

A full description of the protocol can be found in synapse/replication/tcp/init.py
, protocol.py and commands.py

The new replication protocol will keep all the streams separate, rather
than muxing multiple streams into one.
This defines the low level TCP replication protocol
The TCP replication protocol streams deltas of who has started or
stopped syncing. This is different from the HTTP API which periodically
sends the full list of users who are syncing. This commit adds support
for the new TCP style of sending deltas.
As the TCP replication uses a slightly different API and streams than
the HTTP replication.

This breaks HTTP replication.
Copy link
Member

@richvdh richvdh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is too big a change for me to hold properly in my head, but generally it seems like a sane path.

"""This module implements the TCP replication protocol used by synapse to
communicate between the master process and its workers (when they're enabled).

The protocol is based on fire and forget, line based commands. An example flow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I highly recommend moving the protocol docs out to separate rst files in docs. It'll be easier to find, and easier to read, there.


# Congestion

If the server sends messaegs faster than the client can consume them the server
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messaegs


If the server sends messaegs faster than the client can consume them the server
will first buffer a (fairly large) number of commands and then disconnect the
client. This ensure that we don't queue up an unbounded number of commands in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensures

since e.g. commands are not resent if the connection disappears.

The exception to that are the replication streams, i.e. RDATA commands, since
theses include tokens which can be used to restart the stream on connection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

theses

> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you fabricate an example of a batched result? I'm still slightly unclear what they would look like at this point.

self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)

@defer.inlineCallbacks
def subscripe_to_stream(self, stream_name, token):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscripe

))
process_presence.add(user_id)
elif user_id in process_presence:
updates.append(prev_state.copy_and_replace(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks incorrect for the case where is_syncing==False ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, as we don't immediately set a user's presence to offline if they "stopped" syncing, as they'll probably just come back. We instead set the last_user_sync_ts and time them out later.

OTOH, we might want to not send "stopped syncing" notifs from the synchrotron each time a /sync call finishes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we be removing user_id from process_presence, though?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I got confused with the other comment about state = <offline> :/ Yes we should remove it, does so on other PR


yield self._update_states([
prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not need to set state = <offline> here?

def get_currently_syncing_users(self):
"""Get the list of currently syncing users (if any). This is called
when a connection has been established and we need to send the
currently syncing users. (Overriden by the synchrotron's only)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/'//

)
self.get_account_data_for_user.invalidate((user_id,))
def process_replication_rows(self, stream_name, token, rows):
if stream_name == "tag_account_data":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is all this changing around?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What in particular? The process_replication to process_replication_rows? Or the fact that I think I have split the account_data stream into two?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we seem to have starrted with user_account_data, room_account_data, and tag_account_data (in that order), and have ended up with tag_account_data and account_data.

I guess account_data is a combination of user_account_data and room_account_data; but I'm also confused by the reordering.

This change might make sense, but it's the sort of thing I'd prefer to see happening independently of the change of protocol.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done as part of making it such that you only get the stream you asked for, as all three would come down if you asked for "account_data". I appreciate it would have been better to do it separately

@erikjohnston
Copy link
Member Author

this is too big a change for me to hold properly in my head, but generally it seems like a sane path.

Suggestions on how to make this better are welcome, given I had all the code I didn't know the best way of splitting it up.

Would it perhaps make sense to split this so that we can land only the server side portion of it? That may allow us to actually try it live and make it easier to review? I honestly don't know though.

@erikjohnston
Copy link
Member Author

Closed in favour of #2082 which is just the server side component

@hawkowl hawkowl deleted the erikj/repl_tcp branch September 20, 2018 13:58
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants