Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminated the Map from MerkleMap #47

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 101 additions & 42 deletions lib/delta_crdt/causal_crdt.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@ defmodule DeltaCrdt.CausalCrdt do

require BenchmarkHelper

alias MerkleMap.MerkleTree

BenchmarkHelper.inject_in_dev()

@type delta :: {k :: integer(), delta :: any()}
@type delta_interval :: {a :: integer(), b :: integer(), delta :: delta()}

@moduledoc false

@attempts 3

defstruct node_id: nil,
name: nil,
on_diffs: nil,
storage_module: nil,
crdt_module: nil,
crdt_state: nil,
merkle_map: MerkleMap.new(),
merkle_tree: MerkleTree.new(),
sequence_number: 0,
neighbours: MapSet.new(),
neighbour_monitors: %{},
Expand Down Expand Up @@ -88,40 +92,83 @@ defmodule DeltaCrdt.CausalCrdt do
{:noreply, new_state}
end

def handle_info({:diff, diff}, state) do
def handle_info({:retry_diff, n, diff}, state) do
diff = reverse_diff(diff)

new_merkle_map = MerkleMap.update_hashes(state.merkle_map)

case MerkleMap.continue_partial_diff(diff.continuation, new_merkle_map, 8) do
{:continue, continuation} ->
%Diff{diff | continuation: truncate(continuation, state.max_sync_size)}
|> send_diff_continue()
new_merkle_tree = MerkleTree.update_hashes(state.merkle_tree)

sent? =
case MerkleTree.continue_partial_diff(new_merkle_tree, diff.continuation, 8) do
{:continue, continuation} ->
%Diff{diff | continuation: truncate(continuation, state.max_sync_size)}
|> send_diff_continue()

{:ok, []} ->
# If remote is busy sending ack, do nothing. It's not a big deal as
# the remote outstanding_syncs will keep a dangling entry which will
# be overridden at next sync interval.
ack_diff(diff)
true

{:ok, keys} ->
case send_diff(diff, truncate(keys, state.max_sync_size), state) do
true ->
# Same as above.
ack_diff(diff)
true

_ ->
false
end
end

{:ok, []} ->
ack_diff(diff)
case sent? do
true ->
{:noreply, Map.put(state, :merkle_tree, new_merkle_tree)}

false ->
if n + 1 <= @attempts do
Process.send_after(
self(),
{:retry_diff, n + 1, diff},
div(state.sync_interval, @attempts)
)
end

{:ok, keys} ->
send_diff(diff, truncate(keys, state.max_sync_size), state)
ack_diff(diff)
{:noreply, state}
end

{:noreply, Map.put(state, :merkle_map, new_merkle_map)}
end

def handle_info({:get_diff, diff, keys}, state) do
def handle_info({:diff, diff}, state), do: handle_info({:retry_diff, 0, diff}, state)

def handle_info({:retry_get_diff, n, diff, keys}, state) do
diff = reverse_diff(diff)

send(
diff.to,
{:diff,
%{state.crdt_state | dots: diff.dots, value: Map.take(state.crdt_state.value, keys)}, keys}
)
case send_nosuspend(
diff.to,
{:diff,
%{state.crdt_state | dots: diff.dots, value: Map.take(state.crdt_state.value, keys)},
keys}
) do
true ->
ack_diff(diff)

false ->
if n + 1 <= @attempts do
Process.send_after(
self(),
{:retry_get_diff, n + 1, diff, keys},
div(state.sync_interval, @attempts)
)
end
end

ack_diff(diff)
{:noreply, state}
end

def handle_info({:get_diff, diff, keys}, state),
do: handle_info({:retry_get_diff, 0, diff, keys}, state)

def handle_info({:EXIT, _pid, :normal}, state), do: {:noreply, state}

def handle_info({:DOWN, ref, :process, _object, _reason}, state) do
Expand Down Expand Up @@ -156,7 +203,7 @@ defmodule DeltaCrdt.CausalCrdt do
end

new_outstanding_syncs =
Enum.filter(state.outstanding_syncs, fn {neighbour, 1} ->
Enum.filter(state.outstanding_syncs, fn {neighbour, _} ->
MapSet.member?(new_neighbours, neighbour)
end)
|> Map.new()
Expand Down Expand Up @@ -210,7 +257,7 @@ defmodule DeltaCrdt.CausalCrdt do
end

defp truncate(diff, size) when is_integer(size) do
MerkleMap.truncate_diff(diff, size)
MerkleTree.truncate_diff(diff, size)
end

defp read_from_storage(%{storage_module: nil} = state) do
Expand All @@ -222,10 +269,10 @@ defmodule DeltaCrdt.CausalCrdt do
nil ->
state

{node_id, sequence_number, crdt_state, merkle_map} ->
{node_id, sequence_number, crdt_state, merkle_tree} ->
Map.put(state, :sequence_number, sequence_number)
|> Map.put(:crdt_state, crdt_state)
|> Map.put(:merkle_map, merkle_map)
|> Map.put(:merkle_tree, merkle_tree)
|> Map.put(:node_id, node_id)
|> remove_crdt_state_keys()
end
Expand All @@ -243,16 +290,16 @@ defmodule DeltaCrdt.CausalCrdt do
:ok =
state.storage_module.write(
state.name,
{state.node_id, state.sequence_number, state.crdt_state, state.merkle_map}
{state.node_id, state.sequence_number, state.crdt_state, state.merkle_tree}
)

state
end

defp sync_interval_or_state_to_all(state) do
state = monitor_neighbours(state)
new_merkle_map = MerkleMap.update_hashes(state.merkle_map)
{:continue, continuation} = MerkleMap.prepare_partial_diff(new_merkle_map, 8)
new_merkle_tree = MerkleTree.update_hashes(state.merkle_tree)
{:continue, continuation} = MerkleTree.prepare_partial_diff(new_merkle_tree, 8)

diff = %Diff{
continuation: continuation,
Expand All @@ -267,8 +314,13 @@ defmodule DeltaCrdt.CausalCrdt do
|> Enum.reduce(state.outstanding_syncs, fn neighbour, outstanding_syncs ->
Map.put_new_lazy(outstanding_syncs, neighbour, fn ->
try do
send(neighbour, {:diff, %Diff{diff | to: neighbour}})
1
if send_nosuspend(neighbour, {:diff, %Diff{diff | to: neighbour}}) do
1
else
# This happens when we attempt to sync with a neighbour that is slow.
Logger.debug("tried to sync with a slow neighbour: #{inspect(neighbour)}, move on")
0
end
rescue
_ in ArgumentError ->
# This happens when we attempt to sync with a neighbour that is dead.
Expand All @@ -285,7 +337,7 @@ defmodule DeltaCrdt.CausalCrdt do
|> Map.new()

Map.put(state, :outstanding_syncs, new_outstanding_syncs)
|> Map.put(:merkle_map, new_merkle_map)
|> Map.put(:merkle_tree, new_merkle_tree)
end

defp monitor_neighbours(state) do
Expand Down Expand Up @@ -318,14 +370,14 @@ defmodule DeltaCrdt.CausalCrdt do
end

defp send_diff_continue(diff) do
send(diff.to, {:diff, diff})
send_nosuspend(diff.to, {:diff, diff})
end

defp send_diff(diff, keys, state) do
if diff.originator == diff.to do
send(diff.to, {:get_diff, diff, keys})
send_nosuspend(diff.to, {:get_diff, diff, keys})
else
send(
send_nosuspend(
diff.to,
{:diff,
%{state.crdt_state | dots: diff.dots, value: Map.take(state.crdt_state.value, keys)},
Expand Down Expand Up @@ -387,10 +439,10 @@ defmodule DeltaCrdt.CausalCrdt do

diffs = diff(state, new_state, keys)

{new_merkle_map, count} =
Enum.reduce(diffs, {state.merkle_map, 0}, fn
{:add, key, value}, {mm, count} -> {MerkleMap.put(mm, key, value), count + 1}
{:remove, key}, {mm, count} -> {MerkleMap.delete(mm, key), count + 1}
{new_merkle_tree, count} =
Enum.reduce(diffs, {state.merkle_tree, 0}, fn
{:add, key, value}, {tree, count} -> {MerkleTree.put(tree, key, value), count + 1}
{:remove, key}, {tree, count} -> {MerkleTree.delete(tree, key), count + 1}
end)

:telemetry.execute([:delta_crdt, :sync, :done], %{keys_updated_count: count}, %{
Expand All @@ -399,15 +451,22 @@ defmodule DeltaCrdt.CausalCrdt do

diffs_to_callback(state, new_state, diffs_keys(diffs))

Map.put(new_state, :merkle_map, new_merkle_map)
Map.put(new_state, :merkle_tree, new_merkle_tree)
|> write_to_storage()
end

defp ack_diff(%{originator: originator, from: originator, to: to}) do
send(originator, {:ack_diff, to})
send_nosuspend(originator, {:ack_diff, to})
end

defp ack_diff(%{originator: originator, from: from, to: originator}) do
send(originator, {:ack_diff, from})
send_nosuspend(originator, {:ack_diff, from})
end

defp send_nosuspend(dest, msg) do
case Process.send(dest, msg, [:nosuspend]) do
:ok -> true
:nosuspend -> false
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule DeltaCrdt.MixProject do
{:benchee_html, ">= 0.0.0", only: :dev, runtime: false},
{:exprof, "~> 0.2.0", only: :dev, runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:merkle_map, "~> 0.2.0"},
{:merkle_map, "~> 0.2.1"},
{:stream_data, "~> 0.4", only: :test}
]
end
Expand Down
30 changes: 15 additions & 15 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
%{
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm"},
"benchee_html": {:hex, :benchee_html, "1.0.0", "5b4d24effebd060f466fb460ec06576e7b34a00fc26b234fe4f12c4f05c95947", [:mix], [{:benchee, ">= 0.99.0 and < 2.0.0", [hex: :benchee, repo: "hexpm", optional: false]}, {:benchee_json, "~> 1.0", [hex: :benchee_json, repo: "hexpm", optional: false]}], "hexpm"},
"benchee_json": {:hex, :benchee_json, "1.0.0", "cc661f4454d5995c08fe10dd1f2f72f229c8f0fb1c96f6b327a8c8fc96a91fe5", [:mix], [{:benchee, ">= 0.99.0 and < 2.0.0", [hex: :benchee, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.2.6", "b6da42b3831458d3ecc57314dff3051b080b9b2be88c2e5aa41cd642a5b044ed", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.19.1", "519bb9c19526ca51d326c060cb1778d4a9056b190086a8c6c115828eaccea6cf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.7", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
"exprintf": {:hex, :exprintf, "0.2.1", "b7e895dfb00520cfb7fc1671303b63b37dc3897c59be7cbf1ae62f766a8a0314", [:mix], [], "hexpm"},
"exprof": {:hex, :exprof, "0.2.3", "8d4d657d73fc0c9ef1e30b2f9207b26ccbd2aec2baf1ca43f0b6d244c841c9f8", [:mix], [{:exprintf, "~> 0.2", [hex: :exprintf, repo: "hexpm", optional: false]}], "hexpm"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"makeup": {:hex, :makeup, "0.5.5", "9e08dfc45280c5684d771ad58159f718a7b5788596099bdfb0284597d368a882", [:mix], [{:nimble_parsec, "~> 0.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.10.0", "0f09c2ddf352887a956d84f8f7e702111122ca32fbbc84c2f0569b8b65cbf7fa", [:mix], [{:makeup, "~> 0.5.5", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
"merkle_map": {:hex, :merkle_map, "0.2.0", "5391ac61e016ce4aeb66ce39f05206a382fd4b66ee4b63c08a261d5633eadd76", [:mix], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.4.0", "ee261bb53214943679422be70f1658fff573c5d0b0a1ecd0f18738944f818efe", [:mix], [], "hexpm"},
"stream_data": {:hex, :stream_data, "0.4.2", "fa86b78c88ec4eaa482c0891350fcc23f19a79059a687760ddcf8680aac2799b", [:mix], [], "hexpm"},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"},
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"},
"benchee_html": {:hex, :benchee_html, "1.0.0", "5b4d24effebd060f466fb460ec06576e7b34a00fc26b234fe4f12c4f05c95947", [:mix], [{:benchee, ">= 0.99.0 and < 2.0.0", [hex: :benchee, repo: "hexpm", optional: false]}, {:benchee_json, "~> 1.0", [hex: :benchee_json, repo: "hexpm", optional: false]}], "hexpm", "5280af9aac432ff5ca4216d03e8a93f32209510e925b60e7f27c33796f69e699"},
"benchee_json": {:hex, :benchee_json, "1.0.0", "cc661f4454d5995c08fe10dd1f2f72f229c8f0fb1c96f6b327a8c8fc96a91fe5", [:mix], [{:benchee, ">= 0.99.0 and < 2.0.0", [hex: :benchee, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "da05d813f9123505f870344d68fb7c86a4f0f9074df7d7b7e2bb011a63ec231c"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
"earmark": {:hex, :earmark, "1.2.6", "b6da42b3831458d3ecc57314dff3051b080b9b2be88c2e5aa41cd642a5b044ed", [:mix], [], "hexpm", "b42a23e9bd92d65d16db2f75553982e58519054095356a418bb8320bbacb58b1"},
"ex_doc": {:hex, :ex_doc, "0.19.1", "519bb9c19526ca51d326c060cb1778d4a9056b190086a8c6c115828eaccea6cf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.7", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "dc87f778d8260da0189a622f62790f6202af72f2f3dee6e78d91a18dd2fcd137"},
"exprintf": {:hex, :exprintf, "0.2.1", "b7e895dfb00520cfb7fc1671303b63b37dc3897c59be7cbf1ae62f766a8a0314", [:mix], [], "hexpm", "20a0e8c880be90e56a77fcc82533c5d60c643915c7ce0cc8aa1e06ed6001da28"},
"exprof": {:hex, :exprof, "0.2.3", "8d4d657d73fc0c9ef1e30b2f9207b26ccbd2aec2baf1ca43f0b6d244c841c9f8", [:mix], [{:exprintf, "~> 0.2", [hex: :exprintf, repo: "hexpm", optional: false]}], "hexpm", "040410c672e2403908f578fb1e203a178692bbcd61b11d3745912662532b1dad"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
"makeup": {:hex, :makeup, "0.5.5", "9e08dfc45280c5684d771ad58159f718a7b5788596099bdfb0284597d368a882", [:mix], [{:nimble_parsec, "~> 0.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d7152ff93f2eac07905f510dfa03397134345ba4673a00fbf7119bab98632940"},
"makeup_elixir": {:hex, :makeup_elixir, "0.10.0", "0f09c2ddf352887a956d84f8f7e702111122ca32fbbc84c2f0569b8b65cbf7fa", [:mix], [{:makeup, "~> 0.5.5", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "4a36dd2d0d5c5f98d95b3f410d7071cd661d5af310472229dd0e92161f168a44"},
"merkle_map": {:hex, :merkle_map, "0.2.1", "01a88c87a6b9fb594c67c17ebaf047ee55ffa34e74297aa583ed87148006c4c8", [:mix], [], "hexpm", "fed4d143a5c8166eee4fa2b49564f3c4eace9cb252f0a82c1613bba905b2d04d"},
"nimble_parsec": {:hex, :nimble_parsec, "0.4.0", "ee261bb53214943679422be70f1658fff573c5d0b0a1ecd0f18738944f818efe", [:mix], [], "hexpm", "ebb595e19456a72786db6dcd370d320350cb624f0b6203fcc7e23161d49b0ffb"},
"stream_data": {:hex, :stream_data, "0.4.2", "fa86b78c88ec4eaa482c0891350fcc23f19a79059a687760ddcf8680aac2799b", [:mix], [], "hexpm", "54d6bf6f1e5e27fbf4a7784a2bffbb993446d0efd079debca0f27bf859c0d1cf"},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm", "e9e3cacfd37c1531c0ca70ca7c0c30ce2dbb02998a4f7719de180fe63f8d41e4"},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm"},
"xxhash": {:hex, :xxhash, "0.2.1", "ab0893a8124f3c11116c57e500485dc5f67817d1d4c44f0fff41f3fd3c590607", [:mix], [], "hexpm"},
}