Skip to content

Commit

Permalink
Fix :bulk_metric_registry_change event emitting
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanIvanoff committed Dec 20, 2024
1 parent 431a4e3 commit d2eaafb
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 16 deletions.
3 changes: 2 additions & 1 deletion lib/sanbase/event_bus/metric_registry_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do
state
end

defp handle_event(_event, event_shadow, state) do
defp handle_event(event, event_shadow, state) do
Logger.warning("Unrecognized event #{inspect(event)} received in #{__MODULE__}")
EventBus.mark_as_completed({__MODULE__, event_shadow})
state
end
Expand Down
7 changes: 3 additions & 4 deletions lib/sanbase/metric/registry/event_emitter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ defmodule Sanbase.Metric.Registry.EventEmitter do
def topic(), do: @topic

def handle_event(_, event_type, _args) when event_type in [:metrics_failed_to_load] do
%{
event_type: event_type
}
%{event_type: event_type}
|> notify()
end

def handle_event({:ok, map}, event_type = :bulk_metric_registry_change, args) do
def handle_event({:ok, map}, event_type, args)
when event_type in [:bulk_metric_registry_change] do
%{event_type: event_type}
|> Map.merge(map)
|> Map.merge(args)
Expand Down
27 changes: 18 additions & 9 deletions lib/sanbase/metric/registry/populate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@ defmodule Sanbase.Metric.Registry.Populate do
"""
def run() do
Sanbase.Repo.transaction(fn ->
populate()
populate(emit_events: false)
end)
|> case do
{:ok, {:ok, list, summary}} -> {:ok, list, summary}
data -> data
{:ok, {:ok, list, summary}} ->
# In case of populate/0 running inside a transaction do not emit the event
# from within the transaction. If the event finishes before the transaction
# is commited, the event won't see the new data
emit_events(list, summary)
{:ok, list, summary}

data ->
data
end
end

Expand Down Expand Up @@ -50,11 +57,11 @@ defmodule Sanbase.Metric.Registry.Populate do
|> Sanbase.Metric.Registry.changeset(params)
end

def populate() do
def populate(opts \\ []) do
case process_metrics() do
list when is_list(list) ->
{:ok, list, summary} = summarize_results(list)
emit_events(list, summary)
if Keyword.get(opts, :emit_events, true), do: emit_events(list, summary)
{:ok, list, summary}

{:error, %Ecto.Changeset{} = error} ->
Expand Down Expand Up @@ -148,10 +155,10 @@ defmodule Sanbase.Metric.Registry.Populate do
end

defp emit_events(list, summary) do
inserts = Map.get(summary, :insert, [])
updates = Map.get(summary, :update, [])
inserts = Map.get(summary, :insert, 0)
updates = Map.get(summary, :update, 0)

if inserts != [] or updates != [] do
if inserts > 0 or updates > 0 do
map = %{inserts_count: inserts, updates_count: updates}

{inserted_metrics, updated_metrics} =
Expand All @@ -178,10 +185,12 @@ defmodule Sanbase.Metric.Registry.Populate do
# in persistent term
Node.list()
|> Enum.each(fn node ->
IO.puts("Emitting event :bulk_metric_registry_change to #{node}")

Node.spawn(node, fn ->
Sanbase.Metric.Registry.EventEmitter.emit_event(
{:ok, map},
:bulk_update_metric_registry,
:bulk_metric_registry_change,
%{__only_process_by__: [Sanbase.EventBus.MetricRegistrySubscriber]}
)
end)
Expand Down
6 changes: 4 additions & 2 deletions test/test_seeds.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ IO.puts("Running test seeds")

IO.puts("Populating the Metric Registry...")

{:ok, metrics, _summary} = Sanbase.Metric.Registry.Populate.run()
{:ok, metrics, summary} = Sanbase.Metric.Registry.Populate.run()

IO.puts("Finished populating the Metric Registry. Inserted #{length(metrics)} metrics")
IO.puts(
"Finished populating the Metric Registry. Inserted #{length(metrics)} metrics. Summary: #{inspect(summary)}"
)

0 comments on commit d2eaafb

Please sign in to comment.