diff --git a/lib/delta_crdt/causal_crdt.ex b/lib/delta_crdt/causal_crdt.ex index ca5cd07..2c0ed4d 100644 --- a/lib/delta_crdt/causal_crdt.ex +++ b/lib/delta_crdt/causal_crdt.ex @@ -5,6 +5,8 @@ defmodule DeltaCrdt.CausalCrdt do require BenchmarkHelper + alias MerkleMap.MerkleTree + BenchmarkHelper.inject_in_dev() @type delta :: {k :: integer(), delta :: any()} @@ -12,13 +14,15 @@ defmodule DeltaCrdt.CausalCrdt do @moduledoc false + @attempts 3 + defstruct node_id: nil, name: nil, on_diffs: nil, storage_module: nil, crdt_module: nil, crdt_state: nil, - merkle_map: MerkleMap.new(), + merkle_tree: MerkleTree.new(), sequence_number: 0, neighbours: MapSet.new(), neighbour_monitors: %{}, @@ -88,40 +92,83 @@ defmodule DeltaCrdt.CausalCrdt do {:noreply, new_state} end - def handle_info({:diff, diff}, state) do + def handle_info({:retry_diff, n, diff}, state) do diff = reverse_diff(diff) - new_merkle_map = MerkleMap.update_hashes(state.merkle_map) - - case MerkleMap.continue_partial_diff(diff.continuation, new_merkle_map, 8) do - {:continue, continuation} -> - %Diff{diff | continuation: truncate(continuation, state.max_sync_size)} - |> send_diff_continue() + new_merkle_tree = MerkleTree.update_hashes(state.merkle_tree) + + sent? = + case MerkleTree.continue_partial_diff(new_merkle_tree, diff.continuation, 8) do + {:continue, continuation} -> + %Diff{diff | continuation: truncate(continuation, state.max_sync_size)} + |> send_diff_continue() + + {:ok, []} -> + # If remote is busy sending ack, do nothing. It's not a big deal as + # the remote outstanding_syncs will keep a dangling entry which will + # be overridden at next sync interval. + ack_diff(diff) + true + + {:ok, keys} -> + case send_diff(diff, truncate(keys, state.max_sync_size), state) do + true -> + # Same as above. + ack_diff(diff) + true + + _ -> + false + end + end - {:ok, []} -> - ack_diff(diff) + case sent? do + true -> + {:noreply, Map.put(state, :merkle_tree, new_merkle_tree)} + + false -> + if n + 1 <= @attempts do + Process.send_after( + self(), + {:retry_diff, n + 1, diff}, + div(state.sync_interval, @attempts) + ) + end - {:ok, keys} -> - send_diff(diff, truncate(keys, state.max_sync_size), state) - ack_diff(diff) + {:noreply, state} end - - {:noreply, Map.put(state, :merkle_map, new_merkle_map)} end - def handle_info({:get_diff, diff, keys}, state) do + def handle_info({:diff, diff}, state), do: handle_info({:retry_diff, 0, diff}, state) + + def handle_info({:retry_get_diff, n, diff, keys}, state) do diff = reverse_diff(diff) - send( - diff.to, - {:diff, - %{state.crdt_state | dots: diff.dots, value: Map.take(state.crdt_state.value, keys)}, keys} - ) + case send_nosuspend( + diff.to, + {:diff, + %{state.crdt_state | dots: diff.dots, value: Map.take(state.crdt_state.value, keys)}, + keys} + ) do + true -> + ack_diff(diff) + + false -> + if n + 1 <= @attempts do + Process.send_after( + self(), + {:retry_get_diff, n + 1, diff, keys}, + div(state.sync_interval, @attempts) + ) + end + end - ack_diff(diff) {:noreply, state} end + def handle_info({:get_diff, diff, keys}, state), + do: handle_info({:retry_get_diff, 0, diff, keys}, state) + def handle_info({:EXIT, _pid, :normal}, state), do: {:noreply, state} def handle_info({:DOWN, ref, :process, _object, _reason}, state) do @@ -156,7 +203,7 @@ defmodule DeltaCrdt.CausalCrdt do end new_outstanding_syncs = - Enum.filter(state.outstanding_syncs, fn {neighbour, 1} -> + Enum.filter(state.outstanding_syncs, fn {neighbour, _} -> MapSet.member?(new_neighbours, neighbour) end) |> Map.new() @@ -210,7 +257,7 @@ defmodule DeltaCrdt.CausalCrdt do end defp truncate(diff, size) when is_integer(size) do - MerkleMap.truncate_diff(diff, size) + MerkleTree.truncate_diff(diff, size) end defp read_from_storage(%{storage_module: nil} = state) do @@ -222,10 +269,10 @@ defmodule DeltaCrdt.CausalCrdt do nil -> state - {node_id, sequence_number, crdt_state, merkle_map} -> + {node_id, sequence_number, crdt_state, merkle_tree} -> Map.put(state, :sequence_number, sequence_number) |> Map.put(:crdt_state, crdt_state) - |> Map.put(:merkle_map, merkle_map) + |> Map.put(:merkle_tree, merkle_tree) |> Map.put(:node_id, node_id) |> remove_crdt_state_keys() end @@ -243,7 +290,7 @@ defmodule DeltaCrdt.CausalCrdt do :ok = state.storage_module.write( state.name, - {state.node_id, state.sequence_number, state.crdt_state, state.merkle_map} + {state.node_id, state.sequence_number, state.crdt_state, state.merkle_tree} ) state @@ -251,8 +298,8 @@ defmodule DeltaCrdt.CausalCrdt do defp sync_interval_or_state_to_all(state) do state = monitor_neighbours(state) - new_merkle_map = MerkleMap.update_hashes(state.merkle_map) - {:continue, continuation} = MerkleMap.prepare_partial_diff(new_merkle_map, 8) + new_merkle_tree = MerkleTree.update_hashes(state.merkle_tree) + {:continue, continuation} = MerkleTree.prepare_partial_diff(new_merkle_tree, 8) diff = %Diff{ continuation: continuation, @@ -267,8 +314,13 @@ defmodule DeltaCrdt.CausalCrdt do |> Enum.reduce(state.outstanding_syncs, fn neighbour, outstanding_syncs -> Map.put_new_lazy(outstanding_syncs, neighbour, fn -> try do - send(neighbour, {:diff, %Diff{diff | to: neighbour}}) - 1 + if send_nosuspend(neighbour, {:diff, %Diff{diff | to: neighbour}}) do + 1 + else + # This happens when we attempt to sync with a neighbour that is slow. + Logger.debug("tried to sync with a slow neighbour: #{inspect(neighbour)}, move on") + 0 + end rescue _ in ArgumentError -> # This happens when we attempt to sync with a neighbour that is dead. @@ -285,7 +337,7 @@ defmodule DeltaCrdt.CausalCrdt do |> Map.new() Map.put(state, :outstanding_syncs, new_outstanding_syncs) - |> Map.put(:merkle_map, new_merkle_map) + |> Map.put(:merkle_tree, new_merkle_tree) end defp monitor_neighbours(state) do @@ -318,14 +370,14 @@ defmodule DeltaCrdt.CausalCrdt do end defp send_diff_continue(diff) do - send(diff.to, {:diff, diff}) + send_nosuspend(diff.to, {:diff, diff}) end defp send_diff(diff, keys, state) do if diff.originator == diff.to do - send(diff.to, {:get_diff, diff, keys}) + send_nosuspend(diff.to, {:get_diff, diff, keys}) else - send( + send_nosuspend( diff.to, {:diff, %{state.crdt_state | dots: diff.dots, value: Map.take(state.crdt_state.value, keys)}, @@ -387,10 +439,10 @@ defmodule DeltaCrdt.CausalCrdt do diffs = diff(state, new_state, keys) - {new_merkle_map, count} = - Enum.reduce(diffs, {state.merkle_map, 0}, fn - {:add, key, value}, {mm, count} -> {MerkleMap.put(mm, key, value), count + 1} - {:remove, key}, {mm, count} -> {MerkleMap.delete(mm, key), count + 1} + {new_merkle_tree, count} = + Enum.reduce(diffs, {state.merkle_tree, 0}, fn + {:add, key, value}, {tree, count} -> {MerkleTree.put(tree, key, value), count + 1} + {:remove, key}, {tree, count} -> {MerkleTree.delete(tree, key), count + 1} end) :telemetry.execute([:delta_crdt, :sync, :done], %{keys_updated_count: count}, %{ @@ -399,15 +451,22 @@ defmodule DeltaCrdt.CausalCrdt do diffs_to_callback(state, new_state, diffs_keys(diffs)) - Map.put(new_state, :merkle_map, new_merkle_map) + Map.put(new_state, :merkle_tree, new_merkle_tree) |> write_to_storage() end defp ack_diff(%{originator: originator, from: originator, to: to}) do - send(originator, {:ack_diff, to}) + send_nosuspend(originator, {:ack_diff, to}) end defp ack_diff(%{originator: originator, from: from, to: originator}) do - send(originator, {:ack_diff, from}) + send_nosuspend(originator, {:ack_diff, from}) + end + + defp send_nosuspend(dest, msg) do + case Process.send(dest, msg, [:nosuspend]) do + :ok -> true + :nosuspend -> false + end end end diff --git a/mix.exs b/mix.exs index 0bceb69..81f8579 100644 --- a/mix.exs +++ b/mix.exs @@ -28,7 +28,7 @@ defmodule DeltaCrdt.MixProject do {:benchee_html, ">= 0.0.0", only: :dev, runtime: false}, {:exprof, "~> 0.2.0", only: :dev, runtime: false}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, - {:merkle_map, "~> 0.2.0"}, + {:merkle_map, "~> 0.2.1"}, {:stream_data, "~> 0.4", only: :test} ] end diff --git a/mix.lock b/mix.lock index 99deff3..fcf3bcc 100644 --- a/mix.lock +++ b/mix.lock @@ -1,19 +1,19 @@ %{ - "benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm"}, - "benchee_html": {:hex, :benchee_html, "1.0.0", "5b4d24effebd060f466fb460ec06576e7b34a00fc26b234fe4f12c4f05c95947", [:mix], [{:benchee, ">= 0.99.0 and < 2.0.0", [hex: :benchee, repo: "hexpm", optional: false]}, {:benchee_json, "~> 1.0", [hex: :benchee_json, repo: "hexpm", optional: false]}], "hexpm"}, - "benchee_json": {:hex, :benchee_json, "1.0.0", "cc661f4454d5995c08fe10dd1f2f72f229c8f0fb1c96f6b327a8c8fc96a91fe5", [:mix], [{:benchee, ">= 0.99.0 and < 2.0.0", [hex: :benchee, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, - "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm"}, - "earmark": {:hex, :earmark, "1.2.6", "b6da42b3831458d3ecc57314dff3051b080b9b2be88c2e5aa41cd642a5b044ed", [:mix], [], "hexpm"}, - "ex_doc": {:hex, :ex_doc, "0.19.1", "519bb9c19526ca51d326c060cb1778d4a9056b190086a8c6c115828eaccea6cf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.7", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"}, - "exprintf": {:hex, :exprintf, "0.2.1", "b7e895dfb00520cfb7fc1671303b63b37dc3897c59be7cbf1ae62f766a8a0314", [:mix], [], "hexpm"}, - "exprof": {:hex, :exprof, "0.2.3", "8d4d657d73fc0c9ef1e30b2f9207b26ccbd2aec2baf1ca43f0b6d244c841c9f8", [:mix], [{:exprintf, "~> 0.2", [hex: :exprintf, repo: "hexpm", optional: false]}], "hexpm"}, - "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, - "makeup": {:hex, :makeup, "0.5.5", "9e08dfc45280c5684d771ad58159f718a7b5788596099bdfb0284597d368a882", [:mix], [{:nimble_parsec, "~> 0.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.10.0", "0f09c2ddf352887a956d84f8f7e702111122ca32fbbc84c2f0569b8b65cbf7fa", [:mix], [{:makeup, "~> 0.5.5", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, - "merkle_map": {:hex, :merkle_map, "0.2.0", "5391ac61e016ce4aeb66ce39f05206a382fd4b66ee4b63c08a261d5633eadd76", [:mix], [], "hexpm"}, - "nimble_parsec": {:hex, :nimble_parsec, "0.4.0", "ee261bb53214943679422be70f1658fff573c5d0b0a1ecd0f18738944f818efe", [:mix], [], "hexpm"}, - "stream_data": {:hex, :stream_data, "0.4.2", "fa86b78c88ec4eaa482c0891350fcc23f19a79059a687760ddcf8680aac2799b", [:mix], [], "hexpm"}, - "telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"}, + "benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"}, + "benchee_html": {:hex, :benchee_html, "1.0.0", "5b4d24effebd060f466fb460ec06576e7b34a00fc26b234fe4f12c4f05c95947", [:mix], [{:benchee, ">= 0.99.0 and < 2.0.0", [hex: :benchee, repo: "hexpm", optional: false]}, {:benchee_json, "~> 1.0", [hex: :benchee_json, repo: "hexpm", optional: false]}], "hexpm", "5280af9aac432ff5ca4216d03e8a93f32209510e925b60e7f27c33796f69e699"}, + "benchee_json": {:hex, :benchee_json, "1.0.0", "cc661f4454d5995c08fe10dd1f2f72f229c8f0fb1c96f6b327a8c8fc96a91fe5", [:mix], [{:benchee, ">= 0.99.0 and < 2.0.0", [hex: :benchee, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "da05d813f9123505f870344d68fb7c86a4f0f9074df7d7b7e2bb011a63ec231c"}, + "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, + "earmark": {:hex, :earmark, "1.2.6", "b6da42b3831458d3ecc57314dff3051b080b9b2be88c2e5aa41cd642a5b044ed", [:mix], [], "hexpm", "b42a23e9bd92d65d16db2f75553982e58519054095356a418bb8320bbacb58b1"}, + "ex_doc": {:hex, :ex_doc, "0.19.1", "519bb9c19526ca51d326c060cb1778d4a9056b190086a8c6c115828eaccea6cf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.7", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "dc87f778d8260da0189a622f62790f6202af72f2f3dee6e78d91a18dd2fcd137"}, + "exprintf": {:hex, :exprintf, "0.2.1", "b7e895dfb00520cfb7fc1671303b63b37dc3897c59be7cbf1ae62f766a8a0314", [:mix], [], "hexpm", "20a0e8c880be90e56a77fcc82533c5d60c643915c7ce0cc8aa1e06ed6001da28"}, + "exprof": {:hex, :exprof, "0.2.3", "8d4d657d73fc0c9ef1e30b2f9207b26ccbd2aec2baf1ca43f0b6d244c841c9f8", [:mix], [{:exprintf, "~> 0.2", [hex: :exprintf, repo: "hexpm", optional: false]}], "hexpm", "040410c672e2403908f578fb1e203a178692bbcd61b11d3745912662532b1dad"}, + "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"}, + "makeup": {:hex, :makeup, "0.5.5", "9e08dfc45280c5684d771ad58159f718a7b5788596099bdfb0284597d368a882", [:mix], [{:nimble_parsec, "~> 0.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d7152ff93f2eac07905f510dfa03397134345ba4673a00fbf7119bab98632940"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.10.0", "0f09c2ddf352887a956d84f8f7e702111122ca32fbbc84c2f0569b8b65cbf7fa", [:mix], [{:makeup, "~> 0.5.5", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "4a36dd2d0d5c5f98d95b3f410d7071cd661d5af310472229dd0e92161f168a44"}, + "merkle_map": {:hex, :merkle_map, "0.2.1", "01a88c87a6b9fb594c67c17ebaf047ee55ffa34e74297aa583ed87148006c4c8", [:mix], [], "hexpm", "fed4d143a5c8166eee4fa2b49564f3c4eace9cb252f0a82c1613bba905b2d04d"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.4.0", "ee261bb53214943679422be70f1658fff573c5d0b0a1ecd0f18738944f818efe", [:mix], [], "hexpm", "ebb595e19456a72786db6dcd370d320350cb624f0b6203fcc7e23161d49b0ffb"}, + "stream_data": {:hex, :stream_data, "0.4.2", "fa86b78c88ec4eaa482c0891350fcc23f19a79059a687760ddcf8680aac2799b", [:mix], [], "hexpm", "54d6bf6f1e5e27fbf4a7784a2bffbb993446d0efd079debca0f27bf859c0d1cf"}, + "telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm", "e9e3cacfd37c1531c0ca70ca7c0c30ce2dbb02998a4f7719de180fe63f8d41e4"}, "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm"}, "xxhash": {:hex, :xxhash, "0.2.1", "ab0893a8124f3c11116c57e500485dc5f67817d1d4c44f0fff41f3fd3c590607", [:mix], [], "hexpm"}, }