From d2eaafb5342106fb6bd5ef570b2f3320fb69198e Mon Sep 17 00:00:00 2001 From: Ivan Ivanov Date: Fri, 20 Dec 2024 13:01:19 +0200 Subject: [PATCH] Fix :bulk_metric_registry_change event emitting --- .../event_bus/metric_registry_subscriber.ex | 3 ++- lib/sanbase/metric/registry/event_emitter.ex | 7 +++-- lib/sanbase/metric/registry/populate.ex | 27 ++++++++++++------- test/test_seeds.exs | 6 +++-- 4 files changed, 27 insertions(+), 16 deletions(-) diff --git a/lib/sanbase/event_bus/metric_registry_subscriber.ex b/lib/sanbase/event_bus/metric_registry_subscriber.ex index c4e8a324b..fab44172f 100644 --- a/lib/sanbase/event_bus/metric_registry_subscriber.ex +++ b/lib/sanbase/event_bus/metric_registry_subscriber.ex @@ -126,7 +126,8 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do state end - defp handle_event(_event, event_shadow, state) do + defp handle_event(event, event_shadow, state) do + Logger.warning("Unrecognized event #{inspect(event)} received in #{__MODULE__}") EventBus.mark_as_completed({__MODULE__, event_shadow}) state end diff --git a/lib/sanbase/metric/registry/event_emitter.ex b/lib/sanbase/metric/registry/event_emitter.ex index 84a44deaa..7654e05d7 100644 --- a/lib/sanbase/metric/registry/event_emitter.ex +++ b/lib/sanbase/metric/registry/event_emitter.ex @@ -5,13 +5,12 @@ defmodule Sanbase.Metric.Registry.EventEmitter do def topic(), do: @topic def handle_event(_, event_type, _args) when event_type in [:metrics_failed_to_load] do - %{ - event_type: event_type - } + %{event_type: event_type} |> notify() end - def handle_event({:ok, map}, event_type = :bulk_metric_registry_change, args) do + def handle_event({:ok, map}, event_type, args) + when event_type in [:bulk_metric_registry_change] do %{event_type: event_type} |> Map.merge(map) |> Map.merge(args) diff --git a/lib/sanbase/metric/registry/populate.ex b/lib/sanbase/metric/registry/populate.ex index 2997adca4..e37494cfd 100644 --- a/lib/sanbase/metric/registry/populate.ex +++ b/lib/sanbase/metric/registry/populate.ex @@ -4,11 +4,18 @@ defmodule Sanbase.Metric.Registry.Populate do """ def run() do Sanbase.Repo.transaction(fn -> - populate() + populate(emit_events: false) end) |> case do - {:ok, {:ok, list, summary}} -> {:ok, list, summary} - data -> data + {:ok, {:ok, list, summary}} -> + # In case of populate/0 running inside a transaction do not emit the event + # from within the transaction. If the event finishes before the transaction + # is commited, the event won't see the new data + emit_events(list, summary) + {:ok, list, summary} + + data -> + data end end @@ -50,11 +57,11 @@ defmodule Sanbase.Metric.Registry.Populate do |> Sanbase.Metric.Registry.changeset(params) end - def populate() do + def populate(opts \\ []) do case process_metrics() do list when is_list(list) -> {:ok, list, summary} = summarize_results(list) - emit_events(list, summary) + if Keyword.get(opts, :emit_events, true), do: emit_events(list, summary) {:ok, list, summary} {:error, %Ecto.Changeset{} = error} -> @@ -148,10 +155,10 @@ defmodule Sanbase.Metric.Registry.Populate do end defp emit_events(list, summary) do - inserts = Map.get(summary, :insert, []) - updates = Map.get(summary, :update, []) + inserts = Map.get(summary, :insert, 0) + updates = Map.get(summary, :update, 0) - if inserts != [] or updates != [] do + if inserts > 0 or updates > 0 do map = %{inserts_count: inserts, updates_count: updates} {inserted_metrics, updated_metrics} = @@ -178,10 +185,12 @@ defmodule Sanbase.Metric.Registry.Populate do # in persistent term Node.list() |> Enum.each(fn node -> + IO.puts("Emitting event :bulk_metric_registry_change to #{node}") + Node.spawn(node, fn -> Sanbase.Metric.Registry.EventEmitter.emit_event( {:ok, map}, - :bulk_update_metric_registry, + :bulk_metric_registry_change, %{__only_process_by__: [Sanbase.EventBus.MetricRegistrySubscriber]} ) end) diff --git a/test/test_seeds.exs b/test/test_seeds.exs index 4d345e87e..ff8faa9d3 100644 --- a/test/test_seeds.exs +++ b/test/test_seeds.exs @@ -2,6 +2,8 @@ IO.puts("Running test seeds") IO.puts("Populating the Metric Registry...") -{:ok, metrics, _summary} = Sanbase.Metric.Registry.Populate.run() +{:ok, metrics, summary} = Sanbase.Metric.Registry.Populate.run() -IO.puts("Finished populating the Metric Registry. Inserted #{length(metrics)} metrics") +IO.puts( + "Finished populating the Metric Registry. Inserted #{length(metrics)} metrics. Summary: #{inspect(summary)}" +)