Skip to content

Commit

Permalink
Add option to run publish_mutation async or not
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanjos committed Aug 15, 2024
1 parent 92d09a5 commit 936c3c6
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 17 deletions.
28 changes: 18 additions & 10 deletions lib/absinthe/subscription/proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ defmodule Absinthe.Subscription.Proxy do
defstruct [
:pubsub,
:node,
:task_super
:task_super,
:async
]

def child_spec([_, _, shard] = args) do
def child_spec([_task_super, _pubsub, shard, _async] = args) do
%{
id: {__MODULE__, shard},
start: {__MODULE__, :start_link, [args]}
Expand All @@ -26,11 +27,11 @@ defmodule Absinthe.Subscription.Proxy do

def topic(shard), do: "__absinthe__:proxy:#{shard}"

def init([task_super, pubsub, shard]) do
def init([task_super, pubsub, shard, async]) do
node_name = pubsub.node_name()
:ok = pubsub.subscribe(topic(shard))
Process.send_after(self(), :gc, @gc_interval)
{:ok, %__MODULE__{pubsub: pubsub, node: node_name, task_super: task_super}}
{:ok, %__MODULE__{pubsub: pubsub, node: node_name, task_super: task_super, async: async}}
end

def handle_info(:gc, state) do
Expand All @@ -42,13 +43,20 @@ defmodule Absinthe.Subscription.Proxy do
def handle_info(payload, state) do
# There's no meaningful form of backpressure to have here, and we can't
# bottleneck execution inside each proxy process

unless payload.node == state.pubsub.node_name() do
Subscription.Local.publish_mutation(
state.pubsub,
payload.mutation_result,
payload.subscribed_fields
)
if state.async do
Task.Supervisor.start_child(state.task_super, Subscription.Local, :publish_mutation, [
state.pubsub,
payload.mutation_result,
payload.subscribed_fields
])
else
Subscription.Local.publish_mutation(
state.pubsub,
payload.mutation_result,
payload.subscribed_fields
)
end
end

{:noreply, state}
Expand Down
8 changes: 4 additions & 4 deletions lib/absinthe/subscription/proxy_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ defmodule Absinthe.Subscription.ProxySupervisor do

use Supervisor

def start_link([pubsub, registry, pool_size]) do
Supervisor.start_link(__MODULE__, {pubsub, registry, pool_size})
def start_link([pubsub, registry, pool_size, async]) do
Supervisor.start_link(__MODULE__, {pubsub, registry, pool_size, async})
end

def init({pubsub, registry, pool_size}) do
def init({pubsub, registry, pool_size, async}) do
task_super_name = Module.concat(registry, TaskSuper)
task_super = {Task.Supervisor, name: task_super_name}

# Shard numbers are generated by phash2 which is 0-based:
proxies =
for shard <- 0..(pool_size - 1) do
{Absinthe.Subscription.Proxy, [task_super_name, pubsub, shard]}
{Absinthe.Subscription.Proxy, [task_super_name, pubsub, shard, async]}
end

Supervisor.init([task_super | proxies], strategy: :one_for_one)
Expand Down
13 changes: 10 additions & 3 deletions lib/absinthe/subscription/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ defmodule Absinthe.Subscription.Supervisor do
pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2)
compress_registry? = Keyword.get(opts, :compress_registry?, true)

Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?})
# Absinthe.Subscription.Proxy listens for subscription messages
# from other nodes and then runs Subscription.Local.publish_mutation to process
# the mutation on the local node. By default it runs in a task superivsor so that
# requests are handled concurrently. However, this may not work for some
# systems. Setting `async` to false makes it so that the requests are processed one at a time.
async? = Keyword.get(opts, :async, true)

Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, async?})
end

def init({pubsub, pool_size, compress_registry?}) do
def init({pubsub, pool_size, compress_registry?, async?}) do
registry_name = Absinthe.Subscription.registry_name(pubsub)
meta = [pool_size: pool_size]

Expand All @@ -40,7 +47,7 @@ defmodule Absinthe.Subscription.Supervisor do
meta: meta,
compressed: compress_registry?
]},
{Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]}
{Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size, async?]}
]

Supervisor.init(children, strategy: :one_for_one)
Expand Down

0 comments on commit 936c3c6

Please sign in to comment.