Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delta_crdt appears to lose data when running on more than one node. #65

Open
bokner opened this issue May 31, 2022 · 10 comments
Open

delta_crdt appears to lose data when running on more than one node. #65

bokner opened this issue May 31, 2022 · 10 comments

Comments

@bokner
Copy link

bokner commented May 31, 2022

The following module simulates the scenario with several concurrent processes writing to CRDT at some constant rate. It looks like when running the script on 2 or more nodes with the interval between writes close to sync_interval, some keys will often be lost.

defmodule DeltaCrdt.Test do
  @crdt_test :crdt_test

  def init(sync_interval \\ 200) do
    {:ok, crdt} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: sync_interval, name: @crdt_test)

    :net_adm.world()
    |> Enum.reject(fn node -> node == Node.self() end)
    |> set_neighbours(crdt)
  end

  def run(opts \\ []) do
    write_to_crdt(
      Keyword.get(opts, :processes, 10),
      Keyword.get(opts, :requests_per_process, 1000),
      Keyword.get(opts, :interval, 200)
      )
  end

  def crdt_length() do
    length(Map.keys(DeltaCrdt.to_map(:crdt_test)))
  end

  def stop() do
    Process.exit(:erlang.whereis(:crdt_test), :normal)
  end

  defp set_neighbours(nodes, crdt) do
    DeltaCrdt.set_neighbours(crdt,
      Enum.map(nodes, fn node -> {@crdt_test, node} end)
    )
  end

  ## 'process_num' processes write `requests_per_process` times to CRDT every 'interval' milliseconds
  defp write_to_crdt(process_num, requests_per_process, interval) do
    Enum.each(1..process_num,
      fn _process_id -> Task.async(fn -> write(requests_per_process, interval) end)
  end)
  end

  defp write(requests_per_process, interval) do
    Enum.each(1..requests_per_process,
    fn _request_id ->
      DeltaCrdt.put(@crdt_test,
        :erlang.make_ref(), DateTime.to_unix(DateTime.utc_now(), :millisecond))
      :timer.sleep(interval)
    end)
  end
end

Steps to reproduce the issue (you might need to run several times):

  1. Open 2 IEx sessions in separate terminals.
iex --sname node1 --cookie delta_crdt -S mix
iex --sname node2 --cookie delta_crdt -S mix
  1. In both IEx sessions:
import DeltaCrdt.Test
## This will initialize crdt on the node with 'sync_interval = 200' and then run 10 concurrent processes, 
## each writing 1000 times, with 100 msecs between writes.
init(200); run(interval: 100)
  1. Wait for couple minutes and then check the number of records in crdt:
crdt_length

Expected value is 20000 (that is, 10 * 1000 * 2), but you'll likely see lesser number, which indicates some records were lost.

You will likely get 20000 if running with much larger interval, i.e.

run(interval: 500) 

, but it gets worse for 3 and more nodes, that is, even much larger interval does not prevent an occasional loss of records.

Note: If you want to run multiple tests within the same IEx session, you can kill current crdt by running:

stop
@derekkraan
Copy link
Owner

Hi @bokner, I tried running this with 3 nodes a few times and got 30k keys in the CRDT. Also tried with 2 nodes and got 20k. Am I missing something?

@bokner
Copy link
Author

bokner commented Jun 22, 2022

Thanks @derekkraan, I'm still able to consistently reproduce it.

I changed the code and steps to reproduce a bit to make it easier to run:

defmodule DeltaCrdt.Test do
  @crdt_test :crdt_test

  require Logger
  def init(sync_interval \\ 200) do
    {:ok, crdt} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: sync_interval, name: @crdt_test)

    :net_adm.world()
    |> Enum.reject(fn node -> node == Node.self() end)
    |> set_neighbours(crdt)
  end

  def run(opts \\ []) do
    init(Keyword.get(opts, :sync_interval, 200))
    write_to_crdt(
      Keyword.get(opts, :processes, 10),
      Keyword.get(opts, :requests_per_process, 1000),
      Keyword.get(opts, :write_interval, 200)
      )
    Logger.info("Records: #{crdt_length()}")
  end

  def crdt_length() do
    length(Map.keys(DeltaCrdt.to_map(:crdt_test)))
  end

  def stop() do
    Process.exit(:erlang.whereis(:crdt_test), :normal)
  end

  defp set_neighbours(nodes, crdt) do
    DeltaCrdt.set_neighbours(crdt,
      Enum.map(nodes, fn node -> {@crdt_test, node} end)
    )
  end

  ## 'process_num' processes write `requests_per_process` times to CRDT every 'write_interval' milliseconds
  defp write_to_crdt(process_num, requests_per_process, write_interval) do
    Task.async_stream(1..process_num, fn _i -> write(requests_per_process, write_interval) end,
      max_concurrency: process_num,
      timeout: :infinity) |> Enum.to_list()
  end

  defp write(requests_per_process, write_interval) do
    Enum.each(1..requests_per_process,
    fn _request_id ->
      DeltaCrdt.put(@crdt_test,
        :erlang.make_ref(), DateTime.to_unix(DateTime.utc_now(), :millisecond))
      :timer.sleep(write_interval)
    end)
  end
end

Steps to reproduce the issue (you might need to run several times):

  1. Open 2 IEx sessions in separate terminals.
iex --sname node1 --cookie delta_crdt -S mix
iex --sname node2 --cookie delta_crdt -S mix
  1. In both IEx sessions, within some small interval between calls:
## This will initialize crdt on the node with 'sync_interval = 200' and then run 10 concurrent processes, 
## each writing 1000 times, with 100 msecs between writes.
DeltaCrdt.Test.run(write_interval: 100, sync_interval: 200)

Note: this is now a blocking call, so it'll take some time depending on value of write_interval or how many processes you want to run (default is 10).

  1. After the simulation is done on each node, it will output the number of records in crdt.
    Wait until all nodes finish, then call:
 DeltaCrdt.Test.crdt_length

on each node.

@bokner
Copy link
Author

bokner commented Jun 22, 2022

I was wondering if the Erlang/Elixir version and/or VM args could make a difference.
Mine are:

Erlang/OTP 24 [erts-12.0.3] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit] [dtrace]
Interactive Elixir (1.13.1) - press Ctrl+C to exit (type h() ENTER for help)

@derekkraan
Copy link
Owner

Hi thanks again, I was able to reproduce this time. I think I know what is happening as well.

In the CRDT, we maintain a list of "dots", basically tracking who has seen what. This is simplified to just track the last seen dot. However, we are also sending partial diffs. So this means that sometimes we might send an update saying "I know of all updates up til X, and that doesn't include key Y". The receiving process thinks "ok, he knows I added Y, and he has also seen all my updates including the addition of Y, and he is saying that it's not there anymore, it must be gone" and deletes Y.

Not sure yet on the solution. I will have to stew on this one probably.

@bokner
Copy link
Author

bokner commented Jun 23, 2022

Thanks @derekkraan !
You can count on me to test all the changes that might fix this issue 😄

@hubertlepicki
Copy link

@derekkraan could this affect Horde.Registry in a way that multiple nodes could register processes under the same key, and on occasion it wouldn't detect the conflict?

@derekkraan
Copy link
Owner

@hubertlepicki I don't believe so. The CRDT still converges, but some data might be missing. If you think it is though, might be worth investigating. I don't have a solution to this issue in mind yet.

@bokner
Copy link
Author

bokner commented Aug 21, 2022

I've done a lot of testing with sufficiently large max_sync_size and/or max_sync_size set to :infinite.
Never once I've lost any data. I have a hunch that the data loss is due to truncation of diffs in merkle_map code.
If I read the code correctly, the truncation happens when the size of diffs exceeds max_sync_size.

@derekkraan
Copy link
Owner

@bokner this is also the direction I was thinking. I am still not sure how to mitigate, perhaps "just send everything every time" is an option. Another possible option is to send only the "dots" that relate to the keys being sent in an update. Not sure how to calculate that though, and the fix for this issue has made it to the back burner unfortunately.

@derekkraan
Copy link
Owner

The issue is: we send a partial update, but we send all "dots" (which is like a record of what we have seen) with that update. Then we get the dots back from the remote node, but without any keys, leading the first node to imagine that the second node has removed the keys. Meanwhile the keys have never been transmitted. So the solution is likely to make sure that we send only dots that are relevant to the update that is being sent. I am not sure how computationally intensive this is. I haven't given it a shot yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants