Skip to content

Commit

Permalink
feat: create module to publish trip modifications to MQTT (#2679)
Browse files Browse the repository at this point in the history
* feat: derive `Jason.Encoder` for `TripModification` structs

* feat(ex/mix/deps): add `emqtt_fallover` library

* feat(ex/config/runtime): get MQTT variables from env in prod

* init(ex/Skate.MqttConnection): source connection adapter from `ride_along`

* feat(ex/Skate.MqttConnection): prefix message topic before publishing

* init: trip_modification_publisher

* init(ex/detours/trip_modification_publisher): tests
init: trip_modification_publisher

* feat(gh/actions/ci): add `:mqtt` integration tests to CI

mbta/api@6382198
  • Loading branch information
firestack authored Jul 10, 2024
1 parent d4757d6 commit 6d6c868
Show file tree
Hide file tree
Showing 13 changed files with 335 additions and 3 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,17 @@ jobs:
POSTGRES_USER: ${{env.DATABASE_USER}}
POSTGRES_DB: ${{env.DATABASE_NAME}}
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- 1883:1883
volumes:
- ./mosquitto:/mosquitto/config/
options: --name mqtt
steps:
- uses: actions/checkout@v4
- name: Restart MQTT to load mosquitto/mosquitto.conf from checkout
run: docker restart mqtt
- name: ASDF cache
uses: actions/cache@v4
with:
Expand Down Expand Up @@ -95,7 +104,7 @@ jobs:
- name: Sobelow
run: mix sobelow
- name: Run tests
run: mix test --cover
run: mix test --cover --include 'Test.Integration'
env:
POSTGRES_USERNAME: ${{env.DATABASE_USER}}
POSTGRES_PASSWORD: ${{env.DATABASE_PASSWORD}}
Expand Down
5 changes: 5 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ config :ueberauth, Ueberauth,
keycloak: nil
]

# Default server for dev and testing
config :skate, Skate.MqttConnection,
broker_configs: ["mqtt://system:manager@localhost/"],
broker_client_prefix: "skate"

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
3 changes: 3 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ config :skate,

config :skate, Schedule.CacheFile, cache_filename: "dev_cache.terms"

# Don't require local MQTT setup by default
config :skate, Skate.Detours.TripModificationPublisher, start: false

# For development, we disable any cache and enable
# debugging and code reloading.
#
Expand Down
28 changes: 28 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,32 @@ if config_env() == :prod do
providers: [
keycloak: keycloak_opts
]

mqtt_url = System.get_env("MQTT_BROKER_URLS")

if mqtt_url not in [nil, ""] do
topic_prefix = System.get_env("MQTT_TOPIC_PREFIX", "")
username = System.get_env("MQTT_BROKER_USERNAME")

passwords =
case System.get_env("MQTT_BROKER_PASSWORDS") do
nil -> [nil]
"" -> [nil]
passwords -> String.split(passwords, " ")
end

configs =
for url <- String.split(mqtt_url, " "),
password <- passwords do
EmqttFailover.Config.from_url(url, username: username, password: password)
end

config :skate, Skate.MqttConnection,
enabled?: true,
broker_configs: configs,
broker_topic_prefix: topic_prefix

# Configure TripModifications to publish if the env var is present
config :skate, Skate.Detours.TripModificationPublisher, start: true
end
end
4 changes: 4 additions & 0 deletions lib/realtime/trip_modification.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ defmodule Realtime.TripModification do
}

@enforce_keys [:trip_ids, :shape_id]
@derive Jason.Encoder
defstruct [:trip_ids, :shape_id]
end

Expand All @@ -68,6 +69,7 @@ defmodule Realtime.TripModification do
}

@enforce_keys [:start_stop_selector, :end_stop_selector, :last_modified_time]
@derive Jason.Encoder
defstruct [:start_stop_selector, :end_stop_selector, :last_modified_time]
end

Expand All @@ -82,6 +84,7 @@ defmodule Realtime.TripModification do
stop_id: String.t()
}

@derive Jason.Encoder
defstruct [:stop_id]
end

Expand All @@ -91,6 +94,7 @@ defmodule Realtime.TripModification do
modifications: [Modification.t()]
}
@enforce_keys [:selected_trips, :service_dates, :modifications]
@derive Jason.Encoder
defstruct [:selected_trips, :service_dates, :modifications]

@spec new(input :: Input.t()) :: {:ok, __MODULE__.t()}
Expand Down
4 changes: 3 additions & 1 deletion lib/skate/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ defmodule Skate.Application do
{Phoenix.PubSub, name: Skate.PubSub},
SkateWeb.Endpoint,
Skate.Migrate,
{Oban, Application.fetch_env!(:skate, Oban)}
{Oban, Application.fetch_env!(:skate, Oban)},
{Skate.Detours.TripModificationPublisher,
Application.get_env(:skate, Skate.Detours.TripModificationPublisher)}
]

Supervisor.start_link(children, strategy: :rest_for_one, name: Skate.Supervisor)
Expand Down
147 changes: 147 additions & 0 deletions lib/skate/detours/trip_modification_publisher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
defmodule Skate.Detours.TripModificationPublisher do
@moduledoc """
Connects to the MQTT Broker, then allows sending `Realtime.TripModification`'s
to the Broker.
References: https://github.com/mbta/ride_along/blob/fb440fb15dce22921fc4a141a125f3645da98b26/lib/ride_along/sql_publisher.ex
## Testing
To "unit test" this code, you must have an MQTT broker running locally, or
configure URL's for a MQTT Broker which you have access too.
By default, these "integration tests" are excluded from the test suite,
under the tag `"Test.Integration": :mqtt`. So to run these tests
`--include 'Test.Integration:mqtt'` to run only the MQTT tests;
or you can use `--include 'Test.Integration'` to target all instances of
this tag.
### Example:
> #### Note {: .tip}
> This example requires that you have the `mosquitto` mqtt broker binary in
> your `$PATH`.
>
> Ex: `brew install mosquitto`
```sh
# Start and run `mosquitto` broker in the background
mosquitto &
# Wait for `mosquitto` to launch then launch integration tests with
mix test --include 'Test.Integration' test/skate/detours/trip_modification_publisher_test.exs
```
Alternatively, instead of running something locally with the default
`config :skate, Skate.MqttConnection` config in `config/config.exs` configuration;
You can configure the `config :skate, Skate.MqttConnection` in
`config/runtime.exs` to run in the `Mix` env `:dev` and configure those
environment variables locally to connect to a different Broker;
Or you can edit the config `config/dev.exs` to statically configure it for
local development
"""
use GenServer

@default_name __MODULE__
def start_link(opts) do
if opts[:start] do
name = Keyword.get(opts, :name, @default_name)
GenServer.start_link(__MODULE__, opts, name: name)
else
:ignore
end
end

@doc """
Publishes a `Realtime.TripModification` to the configured `MQTT` server.
MQTT is optional for Skate, and callers should remember to handle both the
`:ok` and `:error` return values
"""
def publish_modification(
%Realtime.TripModification{} = modification,
opts \\ []
) do
is_draft? = Keyword.get(opts, :is_draft?, false)
server = Keyword.get(opts, :server, @default_name)

GenServer.call(
server,
{
:new_modification,
%{
is_draft?: is_draft?,
modification: modification
}
}
)
end

@type t :: %{
connection: Skate.MqttConnection.on_start() | nil,
on_connect_subscribers: [pid()]
}
defstruct connection: nil, on_connect_subscribers: []

@impl GenServer
def init(opts) do
state = %__MODULE__{
on_connect_subscribers: Keyword.get_values(opts, :on_connect)
}

{:ok, state, {:continue, :connect}}
end

@impl GenServer
def handle_continue(:connect, state) do
{:ok, connection} = Skate.MqttConnection.start_link()

{:noreply,
%{
state
| connection: connection
}}
end

@impl GenServer
def handle_info({:connected, _connection}, %__MODULE__{} = state) do
Enum.each(state.on_connect_subscribers, &send(&1, {:connected, self()}))

{:noreply, state}
end

def handle_info({:disconnected, _, _reason}, state) do
{:noreply, state}
end

@impl GenServer
def handle_call(
{:new_modification, %{is_draft?: is_draft?, modification: modification}},
_from,
%__MODULE__{connection: connection} = state
)
when not is_nil(connection) do
id = Ecto.UUID.generate()

res =
Skate.MqttConnection.publish(connection, %EmqttFailover.Message{
topic: trip_modification_topic(id),
payload:
Jason.encode!(%{
data: modification,
meta: %{
is_draft?: is_draft?
}
}),
# Send at least once
qos: 1
})

{
:reply,
{res, id},
state
}
end

def trip_modification_topic(id), do: "#{trip_modifications_topic(id)}/trip_modification"
defp trip_modifications_topic(id), do: "trip_modifications/#{id}"
end
42 changes: 42 additions & 0 deletions lib/skate/mqtt_connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Skate.MqttConnection do
@moduledoc """
Shared functionality to connect to the MQTT broker.
Sourced From: https://github.com/mbta/ride_along/blob/ffdb1007332c3eaf6f36244531468f67333bc292/lib/ride_along/mqtt_connection.ex
"""

@type on_start :: GenServer.on_start()
@type server :: GenServer.server()

@spec start_link(listen_topics :: [String.t()]) :: on_start()
def start_link(topics \\ []) do
app_config = app_config()

EmqttFailover.Connection.start_link(
configs: app_config[:broker_configs],
client_id: EmqttFailover.client_id(prefix: app_config[:broker_client_prefix]),
backoff: {1_000, 60_000, :jitter},
handler: {EmqttFailover.ConnectionHandler.Parent, parent: self(), topics: topics}
)
end

@spec publish(server(), EmqttFailover.Message.t()) :: :ok | {:error, term()}
def publish(connection, message) do
EmqttFailover.Connection.publish(connection, prefix_topic(message))
end

@spec prefix_topic(EmqttFailover.Message.t()) :: EmqttFailover.Message.t()
@spec prefix_topic(binary()) :: binary()
def prefix_topic(%EmqttFailover.Message{topic: topic} = message) do
%{
message
| topic: prefix_topic(topic)
}
end

def prefix_topic(topic) when is_binary(topic), do: (topic_prefix() || "") <> topic

def topic_prefix, do: app_config()[:broker_topic_prefix]

defp app_config, do: Application.get_env(:skate, __MODULE__)
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ defmodule Skate.MixProject do
{:diskusage_logger, "~> 0.2.0"},
{:ecto_sql, "~> 3.4"},
{:ehmon, github: "mbta/ehmon", only: :prod},
{:emqtt_failover, "~> 0.3.0"},
{:ex_aws_rds, "~> 2.0.2"},
{:ex_aws_secretsmanager, "~> 2.0"},
{:ex_aws, "~> 2.1"},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
%{
"backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"},
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
"castore": {:hex, :castore, "0.1.22", "4127549e411bedd012ca3a308dede574f43819fe9394254ca55ab4895abfa1a2", [:mix], [], "hexpm", "c17576df47eb5aa1ee40cc4134316a99f5cad3e215d5c77b8dd3cfef12a22cac"},
Expand All @@ -18,6 +19,7 @@
"ecto": {:hex, :ecto, "3.11.2", "e1d26be989db350a633667c5cda9c3d115ae779b66da567c68c80cfb26a8c9ee", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3c38bca2c6f8d8023f2145326cc8a80100c3ffe4dcbd9842ff867f7fc6156c65"},
"ecto_sql": {:hex, :ecto_sql, "3.11.1", "e9abf28ae27ef3916b43545f9578b4750956ccea444853606472089e7d169470", [:mix], [{:db_connection, "~> 2.5 or ~> 2.4.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16.0 or ~> 0.17.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ce14063ab3514424276e7e360108ad6c2308f6d88164a076aac8a387e1fea634"},
"ehmon": {:git, "https://github.com/mbta/ehmon.git", "1fb603262bd02d74a16183bd8f344dcace9d7561", []},
"emqtt_failover": {:hex, :emqtt_failover, "0.3.0", "d75b77fe0f7ffd66523d076810a6be1ced8d88aeb7eb29fb4111082821c42bd0", [:mix], [{:backoff, "~> 1.1.6", [hex: :backoff, repo: "hexpm", optional: false]}], "hexpm", "c3612e1f0c0cbd7a5c8cf441d030fbdfd9b357be42fe936b1fb66aa1003d2bb6"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_aws": {:hex, :ex_aws, "2.3.3", "826adb1e7b1d09122f79d3f24d0e60c6bf449ca9e1e145cf4668eb60d4aae081", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "140f65ebaf0b92cd40f5afff68e609758c41b62b0291618e9e3c7ffb6b041823"},
"ex_aws_rds": {:hex, :ex_aws_rds, "2.0.2", "38dd8e83d57cf4b7286c4f6f5c978f700c40c207ffcdd6ca5d738e5eba933f9a", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}], "hexpm", "9e5b5cc168077874cbd0d29ba65d01caf1877e705fb5cecacf0667dd19bfa75c"},
Expand Down
2 changes: 2 additions & 0 deletions mosquitto/mosquitto.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
listener 1883
allow_anonymous true
Loading

0 comments on commit 6d6c868

Please sign in to comment.