Skip to content

Commit

Permalink
Handle all_dbs_active in fabric_doc_update
Browse files Browse the repository at this point in the history
Previously fabric_doc_update message handler crashed with a function_clause
error the first time it encountered an all_dbs_active error.

Opt to handle it as a `rexi_EXIT` with a hope at that some workers would still
return a valid result.
  • Loading branch information
nickva committed Jun 6, 2022
1 parent 286b8cb commit 369ecc9
Showing 1 changed file with 66 additions and 0 deletions.
66 changes: 66 additions & 0 deletions src/fabric/src/fabric_doc_update.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ handle_message({rexi_EXIT, _}, Worker, Acc0) ->
{WC, LenDocs, W, GrpDocs, DocReplyDict} = Acc0,
NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
skip_message({WC - 1, LenDocs, W, NewGrpDocs, DocReplyDict});
handle_message({error, all_dbs_active}, Worker, Acc0) ->
% treat it like rexi_EXIT, the hope at least one copy will return successfully
{WC, LenDocs, W, GrpDocs, DocReplyDict} = Acc0,
NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
skip_message({WC - 1, LenDocs, W, NewGrpDocs, DocReplyDict});
handle_message(internal_server_error, Worker, Acc0) ->
% happens when we fail to load validation functions in an RPC worker
{WC, LenDocs, W, GrpDocs, DocReplyDict} = Acc0,
Expand Down Expand Up @@ -329,6 +334,8 @@ doc_update_test_() ->
fun doc_update1/0,
fun doc_update2/0,
fun doc_update3/0,
fun handle_all_dbs_active/0,
fun handle_two_all_dbs_actives/0,
fun one_forbid/0,
fun two_forbid/0,
fun extend_tree_forbid/0,
Expand Down Expand Up @@ -467,6 +474,65 @@ doc_update3() ->

?assertEqual({ok, [{Doc1, {ok, Doc1}}, {Doc2, {ok, Doc2}}]}, Reply).

handle_all_dbs_active() ->
Doc1 = #doc{revs = {1, [<<"foo">>]}},
Doc2 = #doc{revs = {1, [<<"bar">>]}},
Docs = [Doc1, Doc2],
Shards =
mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]),
GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs),
Acc0 = {
length(Shards),
length(Docs),
list_to_integer("2"),
GroupedDocs,
dict:from_list([{Doc, []} || Doc <- Docs])
},

{ok, {WaitingCount1, _, _, _, _} = Acc1} =
handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0),
?assertEqual(WaitingCount1, 2),

{ok, {WaitingCount2, _, _, _, _} = Acc2} =
handle_message({error, all_dbs_active}, lists:nth(2, Shards), Acc1),
?assertEqual(WaitingCount2, 1),

{stop, Reply} =
handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, lists:nth(3, Shards), Acc2),

?assertEqual({ok, [{Doc1, {ok, Doc1}}, {Doc2, {ok, Doc2}}]}, Reply).

handle_two_all_dbs_actives() ->
Doc1 = #doc{revs = {1, [<<"foo">>]}},
Doc2 = #doc{revs = {1, [<<"bar">>]}},
Docs = [Doc1, Doc2],
Shards =
mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]),
GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs),
Acc0 = {
length(Shards),
length(Docs),
list_to_integer("2"),
GroupedDocs,
dict:from_list([{Doc, []} || Doc <- Docs])
},

{ok, {WaitingCount1, _, _, _, _} = Acc1} =
handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0),
?assertEqual(WaitingCount1, 2),

{ok, {WaitingCount2, _, _, _, _} = Acc2} =
handle_message({error, all_dbs_active}, lists:nth(2, Shards), Acc1),
?assertEqual(WaitingCount2, 1),

{stop, Reply} =
handle_message({error, all_dbs_active}, lists:nth(3, Shards), Acc2),

?assertEqual(
{accepted, [{Doc1, {accepted, Doc1}}, {Doc2, {accepted, Doc2}}]},
Reply
).

one_forbid() ->
Doc1 = #doc{revs = {1, [<<"foo">>]}},
Doc2 = #doc{revs = {1, [<<"bar">>]}},
Expand Down

0 comments on commit 369ecc9

Please sign in to comment.