Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fast multi-log sync #2

Open
pgte opened this issue Mar 15, 2018 · 23 comments
Open

fast multi-log sync #2

pgte opened this issue Mar 15, 2018 · 23 comments

Comments

@pgte
Copy link
Contributor

pgte commented Mar 15, 2018

The setting

Even though peer-crdt is transport agnostic, it makes some assumptions about the transport that it's using underneath. Here are some of these assumptions:

  • Each CRDT has an independent append-only operation log.
  • In a CRDT, each local operation generates a data block that is stored on this append-only log.
  • An operations is content-addressable. When stored, an operation is identified by a unique string (from here on we'll call it Content ID, or CID for short).
  • Operations are back-linked to represent causal dependency. An operation that's appended to the log points to the operation (or operations) that preceded it.
  • When concurrent operations are detected, a special merge log entry containing no data is created, pointing to the concurrent operations as its parents.
  • At any given point in time, each replica has a HEAD CID, which is the CID of the latest log entry.

The transport, (peer-crdt-ipfs, for instance) is responsible for:

  • creating new log entries locally
  • broadcasting the local HEAD CID to other participating replicas
  • making log entries available for replication by request

The problem

Even though it's very IPFS-y, this structure poses some performance challenges, namely the replication performance:

To replicate a set of log entries, the receiving replica has to pull them one by one. For each log entry received, it has to inspect it and get the log entry parent CIDs. For each one, it has to verify if it exists locally. If it doesn't exist locally, it has to fetch it from the IPFS network. And so own recursively until all the missing log entries are replicated. This means that it has to do a series of sequential lookups to sync the local log with a remote log. This is obviously very inefficient.

(One optimisation can is in place: Alongside with the HEAD CID, a replica can also broadcast all the parent content ids, up to a count of MAX_PARENTS. This allows a replica to make a bunch of fetch requests in parallel. But if MAX_PARENTS is greater than the count of missing log entries, we're back to the vey inefficient fetch-and-resolve-all-parents loop as before.)

Efficient replication requires knowing the remote replica state

I recently realised that this is inefficient because a replica doesn't know what is the replication state of another replica. If it did, it could send all the missing log entries in one burst.

Also, this happens because of content-addressing. A CID is made from hashing the block content, and so doesn't tell us anything from the replication state of the remote replica.

We need a solution that allows a replica to know exactly which log entries a remote replica needs, so that it can send them immediately and in one burst.

One solution

Vector clocks. (If you are not familiar with VCs, this is a good explanation).

Here I propose a solution where replicas keep track of the remote replica state, and push changes instead of expecting the remote to fetch them.

Here is, broadly the main algorithm I propose:

  • Every time a replica creates an operation, it increments the vector clock for its replica id.
  • A replica frequently broadcasts the current vector clock to other replicas.
  • Each replica keeps track of the replication state of other remote replicas by keeping their HEAD vector clock in memory. This replica state is updated in these events:
    • every time a replica receives the remote HEAD vector clock it updates the replica state.
    • every time a replica sends log entries to a remote replica, it updates the replica state.
  • A replica does its best to keep remote replicas in sync. When it detects that a remote vector clock diverged (which happens when receiving a remote VC update or when local changes happen), it starts the sync protocol, which is simply:
    • is the local state bigger or divergent than the remote replica state?
    • if so, send the missing log entries in one burst
    • and update the remote replica vector clock

Illustrated

A picture is worth a thousand words. Let's then illustrate how this could work with two replicas.

One writer, one reader

Replica A creates 3 operations, O1, O2 and O3.
Replica B does not create any operation.

Our goal here is simply to keep replica B in sync with the changes created by replica A.

untitled drawing-28

Replica A starts by creating each log entry. Each log entry has a vector clock. For the last log entry, O3, the vector clock is {A: 3}.

Now replica B comes online. Since it has no entries, it's vector clock is empty. It broadcasts it's VC to other listening nodes. Replica A receives that message, and now it knows about replica B's state.

fast-multilog-swap

Replica A realises that replica B vector clock is smaller than the local vector clock. By doing some simple vector clock math, it realises that replica B is missing 3 log entries. It fetches those entries and sends them (O1, O2 and O3) to replica B in one burst. It also sends each log entry correspondent vector clock. All this in one burst.

fast-multilog-swap-2

Now replica B has all the missing log entries, both replicas converged:

fast-multilog-swap-3

1 more writer

Now replica B makes some changes to the CRDT, creating 2 new operations (O4 and O5). It also updates the vector clock for each operation:

fast-multilog-swap-5

Now replicas B head is {A:3, B:2}. Because replica B knows that the replication state of replica a is {A:3}, it realizes that replica's A vector clock is smaller than the current local vector clock. Because of that, replica B immediately realizes it needs to send replica A some entries to update it's state.

And so replica B calculates the difference between it's own vector clock and replica A's vector clock. Because of this, it realizes that replica A is missing 2 log entries with the following vector clocks: {A:3, B:1} and {A:3, B:2}, which correspond to operations O4 and O5. Replica B sends these entries to replica A:

fast-multilog-swap-6

Now, replica A has the missing operations, and is able to integrate them into its log:

fast-multilog-swap-7

The two replicas are in sync.

Two concurrent writers

Now let's see how our system can handle two concurrent writes, one done by each replica.

Let's continue with out example replicas:

Replica A creates a new operation and appends it to the local log. Since the VC for the previous head was {A:3, B:2}, the VC for the new operation (which we're calling O6) is going to be {A:4, B:2}

At the same time, replica B also creates a new operation and appends it to the local log. Since the VC for the previous head was {A:3, B:2}, the VC for this new operation (which we're calling O7) is going to be {A:3, B:3}.

fast-multilog-swap-12

Now both replicas have different head vector clocks. Replica A's head VC is {A:4, B:2} and replica B's head VC is {A:3, B:3}.

Let's see what happens in each replica once each new operation is created.

Replica A:

Replica A has a head VC of {A:4, B:2} and knows replica B has a VC of {A:3, B:2}, which means it knows B is lagging behind. By doing some vector clock math it knows that replica B is missing one operation that corresponds to the VC {A:4, B:2}. It then fetches that operation from the operation log and sends it to replica B.

fast-multilog-swap-10

Replica B:

When it creates operation O7 (which has a vector clock of {A:3, B:3}) replica B knows that replica A has a VC of {A:3, B:2}, and is lagging behind. By doing some vector clock math it knows that replica A is missing one operation that corresponds to the VC {A:3, B:3}. It then fetches that operation from the operation log and sends it to replica A.

fast-multilog-swap-11


Now, each replica has all the operations in their log:

fast-multilog-swap-13

Convergence

But now each replica doesn't have a defined HEAD operation. Since there was a divergence, each replica will have to create a merge log entry pointing to both parents. The vector clock for each of the merge entries will be the same: it will be the result of the merge of both vector clocks:

vc1 := {A:4, B:2}
vc2 := {A:3, B:3}
merged := merge(vc1, vc2) // {A:4, B:3}

The result of the merging operation is {A:4, B:3}, and that will be the vector clock for both merge log entries:

fast-multilog-swap-14

Now both replicas A and B are definitely in sync.

in sync

@pgte
Copy link
Contributor Author

pgte commented Mar 15, 2018

@diasdavid @vmx This solution is more specific than IPLD graph-swap, since it's tailored for the multi-log data structure, but I think we need to introduce vector clocks to keep track of remote state. What do you think?

@pgte
Copy link
Contributor Author

pgte commented Mar 16, 2018

Just watched @dominictarr 's talk about scalable secure scuttlebutt (video here) — thanks @olizilla! — and there are some ideas that have some parallel here:

@pgte
Copy link
Contributor Author

pgte commented Mar 16, 2018

Eager and lazy connections:

Turning the redundant connections from eager into lazy is a great idea, which I think we can apply here. I was worried of the redundancy of operation transmission for when there are more than 2 replicas. I think we could solve this this way, making an intelligent use of vector clocks. (A vector clock transmits the information that I know about, if a replica needs new information about a different replica and the specific replica is not in the network, that replica can explicitly ask another replica for that, (instead getting those operations pushed)).

@pgte
Copy link
Contributor Author

pgte commented Mar 16, 2018

Optimizing vector clock transmission

Instead of broadcasting the entire vector clock to every peer, send only the vector clock differences to each peer individually.

@dominictarr
Copy link

@pgte I've been trying to encourage ipfs peeps to use this protocol for a while! I'm glad you've seen this! Lets collaborate on a spec we can use in both ssb and ipfs! I am beginning to roll out support for this into ssb, but my aim is to make an implementation that is sufficiently flexible for other protocols to use. https://github.com/dominictarr/epidemic-broadcast-trees

I'd expect that ipfs will want to use different encoding, but I figure if we can separate the wire protocol, we can use a shared implementation of the behaviour. (our reference implementation is javascript, but we also have a go implementation underway or coming soon)

@daviddias
Copy link
Member

Hi @dominictarr o/ , good to see you around! :)

Pinging @vyzo who has been working on an epidemic-broadcast-trees as well

@vyzo
Copy link

vyzo commented Mar 19, 2018

I should point out that epidemic broadcast trees behave very poorly with multiple/random sources.

@dominictarr
Copy link

oh, I should probably make clear: in ssb we use ebt in the "protocol instance per source" mode. since you are also talking about replicating logs, I think we are in the same situation.

@pgte
Copy link
Contributor Author

pgte commented Mar 20, 2018

@dominictarr thanks so much for reaching out!
This looks like it's just what is needed here, thank you for abstracting this into its own package!
I'll take a look at the code and see how this can be plugged.

I wouldn't worry too much about the encoding as we're going to use multiplexed p2p switch connections which expose a raw stream, so we can use whatever.

I'll be taking a look at the code and getting more familiar with EBT and trying to integrate them here. Sounds good?

@dominictarr
Copy link

@pgte am happy to answer any questions - just post issues on the epidemic-broadcast-trees module, I guess.

@vyzo
Copy link

vyzo commented Mar 21, 2018

You should also check out our progress with gossipsub:
libp2p/go-libp2p-pubsub#67

We want to have a mode for single/fixed source epidemic optimizations in the next iteration.

@dominictarr
Copy link

@vyzo is the a discription of the design that is higher level than the protocol? like what performance properties is it expected to have GRAFT and PRUNE seem a bit like EBT but it's hard to tell whats intended

@vyzo
Copy link

vyzo commented Mar 23, 2018

The intention is to build a mesh of a bounded degree; basically mold floodsub to something that doesn't blow up from amplification as you build larger overlays.
Wrt performance, there are some sample simulations in https://github.com/vyzo/gerbil-simsub; I have run a lot more, and the samples are representative.

The protocol could be turned into EBT by sending GRAFT/PRUNE in response to gossip/late messages, although some care must be exercised for stability. It might be more appropriate to use a CHOKE message for suppressing flow from late messages without severing the link.

@dominictarr
Copy link

@vyzo by degree you mean the number of connections? as in https://en.wikipedia.org/wiki/Degree_(graph_theory)

I still don't understand what GRAFT and PRUNE do. are they messages a peer send to another peer?
do they have an argument (such as what source they are about?)

@vyzo
Copy link

vyzo commented Mar 26, 2018

The number of active connections used by the overlay to propagate messages; it's totally the node degree in graph theoretic sense.

GRAFT and PRUNE are sent between peers to forge the overlay. They have the topic as "argument", there is no notion of single source in the protocol level.

@dominictarr
Copy link

@vyzo is it possible to put gossip sub into a "lazy mode" where it sends a message that you have received an item, but without sending the whole thing - and is that is associated with the subscription channel somehow?

@dominictarr
Copy link

hmm, but I'm guessing that "IHAVE" messages are just hashes (as in the style of the ipfs replication protocol)

@vyzo
Copy link

vyzo commented Mar 26, 2018

You want to just send IHAVE messages in place of forwarding the actual messages? It is possible, but it's not implemented that way.
What do you have in mind?

@vyzo
Copy link

vyzo commented Mar 26, 2018

The IHAVE messages contain seen message ids in the last 3 seconds, they are not hashes.

@dominictarr
Copy link

@vyzo what is a message id? if it's not a hash is it channel_id+sequence or channel_id+timestamp? (in my ebt, channel id + sequence are used for replication, but in all other situations ssb uses hash as id)

@vyzo
Copy link

vyzo commented Mar 27, 2018

The message id is constructed using the sender ID together with an atomic counter.

@pgte
Copy link
Contributor Author

pgte commented Apr 2, 2018

@vyzo this is a bit obscure to me.. is there a place where this gossip-sub protocol is described in a more abstract / reusable way?

@vyzo
Copy link

vyzo commented Apr 2, 2018

@pgte there is https://github.com/vyzo/gerbil-simsub; it looks like it's time to make a spec pr in libp2p-specs though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants