-
Notifications
You must be signed in to change notification settings - Fork 0
/
manager_2pc.ex
96 lines (81 loc) · 2.52 KB
/
manager_2pc.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
defmodule Acai.Services.Manager2Pc do
alias Acai.ServicesAgent
alias Acai.Services
@recv_timeout 1000
def create_notification_2pc(socket, notification) do
# 1 Phase - prepare
prepare_tasks = [
Task.async(fn ->
Services.Persist.init_2pc(socket, notification)
end),
Task.async(fn ->
Services.Counter.init_2pc(socket)
end)
]
prepare_responses = Task.await_many(prepare_tasks)
# 2 Phase - commit/rollback
second_phase(prepare_responses)
end
## Privates
@doc """
Generic 2 phase execution for 2 phase commit.
Based on first phase response ({:ok, commit_fn, rollback_fn}/{:error, error}) will
invoke the right action and return
{:commit, is_success_commit}
{:rollback, is_success_rollback}
"""
defp second_phase(prepare_responses) do
can_commit = all_tasks_success_init_response(prepare_responses)
dbg(prepare_responses)
dbg("can_commit: #{inspect(can_commit)}")
if can_commit do
commit_tasks =
Enum.reduce(prepare_responses, [], fn response, tasks_acc ->
{:ok, commit_fn, _rollback_fn} = response
task = Task.async(commit_fn)
[task | tasks_acc]
end)
commit_responses = Task.await_many(commit_tasks)
success = all_tasks_success_second_phase(commit_responses)
dbg("commit_responses: #{inspect(commit_responses)}, success: #{inspect(success)}")
{:commit, success}
else
rollback_tasks =
Enum.reduce(prepare_responses, [], fn response, tasks_acc ->
case response do
{:ok, _commit_fn, rollback_fn} ->
task = Task.async(rollback_fn)
[task | tasks_acc]
_ ->
tasks_acc
end
end)
rollback_responses = Task.await_many(rollback_tasks)
success = all_tasks_success_second_phase(rollback_responses)
dbg("rollback_responses: #{inspect(rollback_responses)}, success: #{inspect(success)}")
{:rollback, success}
end
end
defp all_tasks_success_init_response(list) do
Enum.all?(list, fn x ->
case x do
{:ok, _commit_fn, _rollback_fn} -> true
_ -> false
end
end)
end
defp all_tasks_success_second_phase(list) do
Enum.all?(list, fn x ->
case x do
{:ok, _data} -> true
_ -> false
end
end)
end
defp get_headers(socket),
do: [
"content-type": "application/json",
"durian-token": socket.assigns.user.token
]
defp get_options(), do: [recv_timeout: @recv_timeout]
end