Skip to content

Commit

Permalink
Merge branch 'main' into hp/simple-drawer
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahpurcell committed Oct 9, 2024
2 parents 1aba57f + f683d85 commit 5549ef4
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 20 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ COPY --from=app-builder --chown=skate:skate /root/_build/prod/rel/skate .

COPY --from=app-builder --chown=skate:skate /root/aws-cert-bundle.pem ./priv/aws-cert-bundle.pem

EXPOSE 4000
# Expose HTTP, EPMD, and Erlang RPC
EXPOSE 4000 4369 57195

ENTRYPOINT ["/usr/bin/dumb-init", "--"]

Expand Down
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ config :skate, Oban,
}
]

config :skate, DNSCluster,
query: :ignore,
log: :info

# Include 2 logger backends
config :logger,
backends: [:console, Sentry.LoggerBackend]
Expand Down
2 changes: 1 addition & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ config :skate, Skate.Detours.TripModificationPublisher, start: false
config :skate, SkateWeb.Endpoint,
https: [
ip: {0, 0, 0, 0},
port: 4000,
port: System.get_env("PORT") || 4000,
cipher_suite: :strong,
keyfile: "priv/cert/selfsigned_key.pem",
certfile: "priv/cert/selfsigned.pem",
Expand Down
2 changes: 2 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,6 @@ if config_env() == :prod do
# Configure TripModifications to publish if the env var is present
config :skate, Skate.Detours.TripModificationPublisher, start: true
end

config :skate, DNSCluster, query: System.get_env("DNS_CLUSTER_QUERY") || :ignore
end
5 changes: 4 additions & 1 deletion lib/notifications/notification.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ defmodule Notifications.Notification do
id: id(),
created_at: Util.Time.timestamp(),
state: NotificationState.t(),
content: Notifications.Db.BlockWaiver.t() | Notifications.Db.BridgeMovement.t()
content:
Notifications.Db.BlockWaiver.t()
| Notifications.Db.BridgeMovement.t()
| Notifications.Db.Detour.t()
}

@derive Jason.Encoder
Expand Down
72 changes: 65 additions & 7 deletions lib/notifications/notification_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule Notifications.NotificationServer do

use GenServer

require Logger

alias Notifications.Bridge
alias Notifications.Notification
alias Notifications.NotificationReason
Expand All @@ -22,8 +24,8 @@ defmodule Notifications.NotificationServer do

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, nil, name: name)
opts = Keyword.put_new(opts, :name, default_name())
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end

@spec new_block_waivers(BlockWaiver.block_waivers_by_block_key(), GenServer.server()) :: :ok
Expand Down Expand Up @@ -91,9 +93,11 @@ defmodule Notifications.NotificationServer do

# Server

@enforce_keys [:name]
defstruct [:name]
@impl GenServer
def init(_) do
{:ok, nil}
def init(opts \\ []) do
{:ok, struct(__MODULE__, opts)}
end

@impl true
Expand Down Expand Up @@ -125,11 +129,31 @@ defmodule Notifications.NotificationServer do
},
state
) do
detour
|> Notifications.Notification.create_activated_detour_notification_from_detour()
|> broadcast(self())
notification =
Notifications.Notification.create_activated_detour_notification_from_detour(detour)

broadcast(notification, self())

notify_caller_new_notification(notify_finished_caller_id, detour: id)
# Send to processes with same name on other nodes
broadcast_notification_to_other_instances(notification, state.name)

{:noreply, state}
end

@impl true
# "Private" method for fetching and sending notifications from distributed
# Elixir
def handle_cast(
{
:broadcast_new_detour_notification,
notification_id
},
state
) do
notification_id
|> Notifications.Notification.get_detour_notification()
|> broadcast(self())

{:noreply, state}
end
Expand All @@ -149,6 +173,7 @@ defmodule Notifications.NotificationServer do
broadcast(notification, self())

notify_caller_new_notification(notify_finished_caller_id, detour: id)
broadcast_notification_to_other_instances(notification, state.name)

{:noreply, state}
end
Expand All @@ -162,6 +187,39 @@ defmodule Notifications.NotificationServer do
send(caller_id, {:new_notification, value})
end

defp broadcast_notification_to_other_instances(
%Notifications.Notification{
id: notification_id,
content: %Notifications.Db.Detour{}
},
server
)
when not is_nil(notification_id) do
# Currently, we've implemented our own "PubSub" for notifications and we
# are not using the provided `Phoenix.PubSub` that comes with Phoenix
# channels. This means we don't benefit from Phoenix PubSub's ability to
# send messages using distributed Elixir, and that we need to implement
# this ourselves at this current time.
# Ideally, Notifications would be delivered using
# `Phoenix.Channel.broadcast` instead of our custom `broadcast` function
# in `NotificationServer`. To do this, we'd need to implement the same
# filtering mechanism that this module has implemented. For now, we'll
# send messages to other Skate instances letting them know about new
# Notifications.

# Skate instances currently do not "specialize", and therefore we need to
# send the notification to all instances
nodes = Node.list()

Logger.info(
"notifying other instances of detour notification_id=#{notification_id} nodes=#{inspect(nodes)}"
)

for node <- nodes do
GenServer.cast({server, node}, {:broadcast_new_detour_notification, notification_id})
end
end

@spec convert_new_block_waivers_to_notifications([BlockWaiver.t()]) :: [
Notification.t()
]
Expand Down
1 change: 1 addition & 0 deletions lib/skate/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ defmodule Skate.Application do
end ++
[
{Phoenix.PubSub, name: Skate.PubSub},
{DNSCluster, Application.get_env(:skate, DNSCluster)},
SkateWeb.Endpoint,
Skate.Migrate,
{Oban, Application.fetch_env!(:skate, Oban)},
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ defmodule Skate.MixProject do
{:csv, "~> 2.4.1"},
{:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false},
{:diskusage_logger, "~> 0.2.0"},
{:dns_cluster, "~> 0.1.3"},
{:ecto_sql, "~> 3.4"},
{:ehmon, github: "mbta/ehmon", only: :prod},
{:emqtt_failover, "~> 0.3.0"},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
"dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"},
"diskusage_logger": {:hex, :diskusage_logger, "0.2.0", "04fc48b538fe4de43153542a71ea94f623d54707d85844123baacfceedf625c3", [:mix], [], "hexpm", "e3f2aed1b0fc4590931c089a6453a4c4eb4c945912aa97bcabcc0cff7851f34d"},
"dns_cluster": {:hex, :dns_cluster, "0.1.3", "0bc20a2c88ed6cc494f2964075c359f8c2d00e1bf25518a6a6c7fd277c9b0c66", [:mix], [], "hexpm", "46cb7c4a1b3e52c7ad4cbe33ca5079fbde4840dedeafca2baf77996c2da1bc33"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"ecto": {:hex, :ecto, "3.11.2", "e1d26be989db350a633667c5cda9c3d115ae779b66da567c68c80cfb26a8c9ee", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3c38bca2c6f8d8023f2145326cc8a80100c3ffe4dcbd9842ff867f7fc6156c65"},
"ecto_sql": {:hex, :ecto_sql, "3.11.1", "e9abf28ae27ef3916b43545f9578b4750956ccea444853606472089e7d169470", [:mix], [{:db_connection, "~> 2.5 or ~> 2.4.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16.0 or ~> 0.17.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ce14063ab3514424276e7e360108ad6c2308f6d88164a076aac8a387e1fea634"},
Expand Down
14 changes: 10 additions & 4 deletions rel/env.sh.eex
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#!/bin/sh
INSTANCE_ID=$(curl --max-time 5 -s http://169.254.169.254/latest/meta-data/instance-id || true)
if [ -n "${INSTANCE_ID}" ]; then
export RELEASE_NODE=skate-${INSTANCE_ID}
fi
ip_address=$(hostname -i)

## Use "long names" for node names for Distributed Elixir
## https://hexdocs.pm/elixir/main/config-and-releases.html#operating-system-environment-configuration
export RELEASE_DISTRIBUTION=name
## Set our node's "long name" so we can find each instance using DNSCluster
export RELEASE_NODE="skate@$ip_address"

echo "Release Distribution: $RELEASE_DISTRIBUTION"
echo "Release Node: $RELEASE_NODE"
4 changes: 4 additions & 0 deletions rel/vm.args.eex
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@

## Give more memory to the literal allocator, used by :persistent_term
+MIscs 2048

## Set and restrict Erlang RPC port
-kernel inet_dist_listen_min 57195
-kernel inet_dist_listen_max 57195
12 changes: 6 additions & 6 deletions test/notifications/notification_server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -621,15 +621,15 @@ defmodule Notifications.NotificationServerTest do
%{server: server}
end

test "saves to database" do
test "saves to database", %{server: server} do
notification_count = 3
# create new notification
for _ <- 1..notification_count do
%{id: id} =
detour =
insert(:detour)

NotificationServer.detour_activated(detour, notify_finished: self(), server: __MODULE__)
NotificationServer.detour_activated(detour, notify_finished: self(), server: server)

assert_receive {:new_notification, detour: ^id}
end
Expand All @@ -650,7 +650,7 @@ defmodule Notifications.NotificationServerTest do
detour =
insert(:detour)

NotificationServer.detour_activated(detour, notify_finished: self(), server: __MODULE__)
NotificationServer.detour_activated(detour, notify_finished: self(), server: server)

assert_receive {:new_notification, detour: ^id}

Expand All @@ -671,15 +671,15 @@ defmodule Notifications.NotificationServerTest do
%{server: server}
end

test "saves to database" do
test "saves to database", %{server: server} do
notification_count = 3
# create new notification
for _ <- 1..notification_count do
%{id: id} =
detour =
insert(:detour)

NotificationServer.detour_deactivated(detour, notify_finished: self(), server: __MODULE__)
NotificationServer.detour_deactivated(detour, notify_finished: self(), server: server)

assert_receive {:new_notification, detour: ^id}
end
Expand All @@ -700,7 +700,7 @@ defmodule Notifications.NotificationServerTest do
detour =
insert(:detour)

NotificationServer.detour_deactivated(detour, notify_finished: self(), server: __MODULE__)
NotificationServer.detour_deactivated(detour, notify_finished: self(), server: server)

assert_receive {:new_notification, detour: ^id}

Expand Down

0 comments on commit 5549ef4

Please sign in to comment.