Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metric registry change suggestions #4460

Merged
merged 8 commits into from
Nov 25, 2024
1 change: 1 addition & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Config
# The watchers configuration can be used to run external
# watchers to your application. For example, we use it
# with brunch.io to recompile .js and .css sources.
config :phoenix_live_view, debug_heex_annotations: true
config :sanbase, Sanbase, url: {:system, "SANBASE_URL", "https://app-stage.santiment.net"}

config :sanbase, SanbaseWeb.Endpoint,
Expand Down
4 changes: 4 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ config :sanbase, Sanbase.EventBus.KafkaExporterSubscriber,
can_send_after_interval: 0,
kafka_flush_timeout: 0

config :sanbase, Sanbase.EventBus.MetricRegistrySubscriber,
metric_registry_change_handler:
{Sanbase.EventBus.MetricRegistrySubscriber, :on_metric_registry_change_test_env}

config :sanbase, Sanbase.ExternalServices.RateLimiting.Server,
implementation_module: Sanbase.ExternalServices.RateLimiting.TestServer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,18 @@ defmodule Sanbase.Billing.Plan.StandardAccessChecker do
def refresh_stored_terms() do
Logger.info("Refreshing stored terms in the #{__MODULE__}")

for {fun, args} <- @functions do
data = compute(fun, args)
result =
for {fun, args} <- @functions do
data = compute(fun, args)

if :not_implemented == data,
do: raise("Function #{fun} is not implemented in module #{__MODULE__}")
if :not_implemented == data,
do: raise("Function #{fun} is not implemented in module #{__MODULE__}")

:ok = :persistent_term.put(key(fun, args), data)
{{fun, args}, :ok}
end
result = :persistent_term.put(key(fun, args), data)
{{fun, args}, result}
end

Enum.all?(result, &match?({_, :ok}, &1))
end

# Private functions
Expand Down
17 changes: 10 additions & 7 deletions lib/sanbase/clickhouse/metric/registry/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,18 @@ defmodule Sanbase.Clickhouse.MetricAdapter.Registry do
Sanbase.Cache.clear(registry_cache_key(remove_hard_deprecated: true))
Sanbase.Cache.clear(registry_cache_key(remove_hard_deprecated: false))

for {fun, args} <- @functions do
data = compute(fun, args)
result =
for {fun, args} <- @functions do
data = compute(fun, args)

if :not_implemented == data,
do: raise("Function #{fun} is not implemented in module #{__MODULE__}")
if :not_implemented == data,
do: raise("Function #{fun} is not implemented in module #{__MODULE__}")

:ok = :persistent_term.put(key(fun, args), data)
{{fun, args}, :ok}
end
result = :persistent_term.put(key(fun, args), data)
{{fun, args}, result}
end

Enum.all?(result, &match?({_, :ok}, &1))
end

# Private functions
Expand Down
33 changes: 28 additions & 5 deletions lib/sanbase/event_bus/metric_registry_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do
"""
use GenServer

alias Sanbase.Utils.Config

require Logger

def topics(), do: ["metric_registry_events"]
Expand Down Expand Up @@ -41,8 +43,24 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do
{:noreply, state}
end

def on_metric_registry_change(_event_type, _metric) do
# Do not change the order here
true = Sanbase.Clickhouse.MetricAdapter.Registry.refresh_stored_terms()
true = Sanbase.Metric.Helper.refresh_stored_terms()
true = Sanbase.Billing.Plan.StandardAccessChecker.refresh_stored_terms()

:ok
end

def on_metric_registry_change_test_env(event_type, metric) do
# In test env this is the handler in order to avoid Ecto DBConnection
# ownership errors
Logger.warning("Metric Registry Change - Event Type: #{event_type}, Metric: #{metric}")
:ok
end

defp handle_event(
%{data: %{event_type: event_type}} = event,
%{data: %{event_type: event_type, metric: metric}} = event,
event_shadow,
state
)
Expand All @@ -52,10 +70,15 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do
:delete_metric_registry
] do
Logger.info("Start refreshing stored terms from #{__MODULE__}")
# Do not change the order here
Sanbase.Clickhouse.MetricAdapter.Registry.refresh_stored_terms()
Sanbase.Metric.Helper.refresh_stored_terms()
Sanbase.Billing.Plan.StandardAccessChecker.refresh_stored_terms()

{mod, fun} =
Config.module_get(
__MODULE__,
:metric_registry_change_handler,
{__MODULE__, :on_metric_registry_change}
)

:ok = apply(mod, fun, [event_type, metric])

Task.Supervisor.async_nolink(Sanbase.TaskSupervisor, fn ->
Sanbase.Notifications.Handler.handle_metric_registry_event(event)
Expand Down
10 changes: 6 additions & 4 deletions lib/sanbase/event_bus/user_events_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ defmodule Sanbase.EventBus.UserEventsSubscriber do
) do
email = Sanbase.Accounts.get_user!(user_id).email
if email, do: Sanbase.Email.MailjetApi.client().subscribe(:monthly_newsletter, email)

EventBus.mark_as_completed({__MODULE__, event_shadow})
state
end
Expand All @@ -64,7 +65,8 @@ defmodule Sanbase.EventBus.UserEventsSubscriber do
state
) do
email = Sanbase.Accounts.get_user!(user_id).email
Sanbase.Email.MailjetApi.client().unsubscribe(:monthly_newsletter, email)
if email, do: Sanbase.Email.MailjetApi.client().unsubscribe(:monthly_newsletter, email)

EventBus.mark_as_completed({__MODULE__, event_shadow})
state
end
Expand Down Expand Up @@ -95,8 +97,7 @@ defmodule Sanbase.EventBus.UserEventsSubscriber do
state
) do
email = Sanbase.Accounts.get_user!(user_id).email

Sanbase.Email.MailjetApi.client().subscribe(:metric_updates, email)
if email, do: Sanbase.Email.MailjetApi.client().subscribe(:metric_updates, email)
EventBus.mark_as_completed({__MODULE__, event_shadow})
state
end
Expand All @@ -107,7 +108,8 @@ defmodule Sanbase.EventBus.UserEventsSubscriber do
state
) do
email = Sanbase.Accounts.get_user!(user_id).email
Sanbase.Email.MailjetApi.client().unsubscribe(:metric_updates, email)
if email, do: Sanbase.Email.MailjetApi.client().unsubscribe(:metric_updates, email)

EventBus.mark_as_completed({__MODULE__, event_shadow})
state
end
Expand Down
35 changes: 35 additions & 0 deletions lib/sanbase/ex_audit/patch.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule Sanbase.ExAudit.Patch do
def format_patch(%{patch: patch}) when is_map(patch) do
changes =
patch
|> Enum.map(fn {field, change} ->
safe_field = Phoenix.HTML.html_escape(to_string(field))
safe_change = Phoenix.HTML.html_escape(format_change_value(change))

content = PhoenixHTMLHelpers.Tag.content_tag(:strong, safe_field)
PhoenixHTMLHelpers.Tag.content_tag(:li, [content, ": ", safe_change])
end)

PhoenixHTMLHelpers.Tag.content_tag(:ul, changes, class: "list-disc list-inside")
end

def format_patch(_), do: Phoenix.HTML.raw("")

defp format_change_value({:changed, {:primitive_change, old_val, new_val}}) do
old = inspect(old_val)
new = inspect(new_val)
"#{old} → #{new}"
end

defp format_change_value({:changed, nested}) when is_map(nested) do
nested_changes =
nested
|> Enum.map_join(", ", fn {k, v} ->
"#{to_string(k)}: #{format_change_value(v)}"
end)

"{#{nested_changes}}"
end

defp format_change_value(other), do: inspect(other)
end
17 changes: 10 additions & 7 deletions lib/sanbase/metric/helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,18 @@ defmodule Sanbase.Metric.Helper do
def refresh_stored_terms() do
Logger.info("Refreshing stored terms in the #{__MODULE__}")

for {fun, args} <- @functions do
data = compute(fun, args)
result =
for {fun, args} <- @functions do
data = compute(fun, args)

if :not_implemented == data,
do: raise("Function #{fun} is not implemented in module #{__MODULE__}")
if :not_implemented == data,
do: raise("Function #{fun} is not implemented in module #{__MODULE__}")

:ok = :persistent_term.put(key(fun, args), data)
{{fun, args}, :ok}
end
result = :persistent_term.put(key(fun, args), data)
{{fun, args}, result}
end

Enum.all?(result, &match?({_, :ok}, &1))
end

# Private functions
Expand Down
Loading