Skip to content

Commit

Permalink
feat: weather poller worker added
Browse files Browse the repository at this point in the history
  • Loading branch information
linalim committed Apr 8, 2022
1 parent fd1bcaf commit a4f1ed5
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 3 deletions.
3 changes: 2 additions & 1 deletion server-phoenix/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,6 @@ config :cors_plug,

config :helios, Helios.Scheduler,
jobs: [
{"*/5 * * * *", fn -> Helios.Workers.ShaPollerWorker.perform() end}
{"*/5 * * * *", fn -> Helios.Workers.ShaPollerWorker.perform() end},
{"*/5 * * * *", fn -> Helios.Workers.WeatherPollerWorker.perform() end}
]
3 changes: 3 additions & 0 deletions server-phoenix/lib/helios/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ defmodule Helios.Application do
@moduledoc false

use Application
alias HeliosWeb.Schema.Types.Sub

@impl true
def start(_type, _args) do
Sub.start_link([])

unless Mix.env() == :prod do
Dotenv.load()
Mix.Task.run("loadconfig")
Expand Down
43 changes: 43 additions & 0 deletions server-phoenix/lib/helios/workers/weather_poller_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule Helios.Workers.WeatherPollerWorker do
use Absinthe.Schema.Notation
import Ecto.Query
alias Helios.{Repo, Location}
alias HeliosWeb.Schema.Types.Sub
alias HeliosWeb.Clients.WeatherClient

def perform do
IO.puts("Running weather poll")

Enum.each(locations, fn location ->
get_forecast(location)
end)
end

def subscriptions do
Sub.get_args() |> Enum.uniq()
end

defmodule LocationParams do
defstruct [:latitude, :longitude]
end

def locations do
Enum.map(subscriptions, fn subscription ->
%LocationParams{latitude: subscription.latitude, longitude: subscription.longitude}
end)
end

def get_forecast(location) do
case WeatherClient.forecast(%{latitude: location.latitude, longitude: location.longitude}) do
{:ok, forecast_data} ->
Absinthe.Subscription.publish(
HeliosWeb.Endpoint,
forecast_data,
weather_published: [location.latitude, location.longitude]
)

{:error, message} ->
IO.puts("Failure polling #{location.latitude} and #{location.longitude}: #{message}")
end
end
end
101 changes: 101 additions & 0 deletions server-phoenix/lib/helios_web/AbsintheChannelDecorator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Reference code
# https://github.com/absinthe-graphql/absinthe_phoenix/issues/39#issuecomment-982310059

defmodule HeliosWeb.AbsintheChannelDecorator do
use Phoenix.Channel
alias HeliosWeb.AbsintheSocketDecorator, as: Socket
require Logger

def join(topic, msg, socket) do
IO.puts("channel decorator JOIN")
Absinthe.Phoenix.Channel.join(topic, msg, socket)
end

def terminate(reason, socket) do
IO.puts("channel decorator TERMINATE")

case reason do
{:shutdown, :closed} ->
socket = run_unsubscribe(socket, "terminate")
{:noreply, socket}

_ ->
IO.puts("nothing")
end
end

defp run_unsubscribe(socket, status) do
# status (enter, leave and terminate)
config = socket.assigns[:absinthe]

Map.get(socket.assigns.absinthe.opts[:context], "payload", "")
|> case do
payload when payload != "" ->
with variables when is_map(variables) <- extract_variables(payload) do
query = Map.get(payload, "query", "")

config_opts = [
context:
Map.merge(
config.opts[:context],
%{"status" => status}
)
]

opts = Keyword.put(config_opts, :variables, variables)
context = socket.assigns.absinthe.opts[:context]
run(query, config[:schema], config[:pipeline], opts)
end

_ ->
socket
end
end

defp run(document, schema, pipeline, options) do
{module, fun} = pipeline

case Absinthe.Pipeline.run(document, apply(module, fun, [schema, options])) do
{:ok, %{result: result, execution: res}, _phases} ->
{:ok, result, res.context}

{:error, msg, _phases} ->
{:error, msg}
end
end

defp extract_variables(payload) do
case Map.get(payload, "variables", %{}) do
nil -> %{}
map -> map
end
end

def handle_in("doc", payload, socket) do
socket =
Socket.put_options(
socket,
context:
Map.merge(
socket.assigns.absinthe.opts[:context],
%{
"status" => "enter",
"payload" => %{
"query" => Map.get(payload, "query", "")
}
}
)
)

Absinthe.Phoenix.Channel.handle_in("doc", payload, socket)
end

def handle_in("unsubscribe", %{"subscriptionId" => doc_id}, socket) do
socket = run_unsubscribe(socket, "leave")
Absinthe.Phoenix.Channel.handle_in("unsubscribe", %{"subscriptionId" => doc_id}, socket)
end

defdelegate handle_in(event, msg, arg2), to: Absinthe.Phoenix.Channel

defdelegate default_pipeline(schema, options), to: Absinthe.Phoenix.Channel
end
21 changes: 21 additions & 0 deletions server-phoenix/lib/helios_web/AbsintheSocketDecorator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule HeliosWeb.AbsintheSocketDecorator do
use Phoenix.Socket

channel("__absinthe__:*", HeliosWeb.AbsintheChannelDecorator,
assigns: %{
__absinthe_schema__: HeliosWeb.Schema,
__absinthe_pipeline__: nil
}
)

def connect(params, socket) do
IO.puts("socket decorator connect")
{:ok, socket}
end

def id(_socket), do: nil

defdelegate put_options(socket, opts), to: Absinthe.Phoenix.Socket

defdelegate put_schema(socket, schema), to: Absinthe.Phoenix.Socket
end
19 changes: 19 additions & 0 deletions server-phoenix/lib/helios_web/channel.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule HeliosWeb.Channel do
use HeliosWeb, :channel
alias HeliosWeb.Presence

def join("some:topic", _params, socket) do
send(self(), :after_join)
{:ok, assign(socket, :user_id)}
end

def handle_info(:after_join, socket) do
{:ok, _} =
Presence.track(socket, socket.assigns.user_id, %{
online_at: inspect(System.system_time(:second))
})

push(socket, "presence_state", Presence.list(socket))
{:noreply, socket}
end
end
3 changes: 2 additions & 1 deletion server-phoenix/lib/helios_web/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ defmodule HeliosWeb.Endpoint do
signing_salt: "oq2zX4/B"
]

socket "/socket", HeliosWeb.UserSocket,
socket("/socket", HeliosWeb.AbsintheSocketDecorator,
websocket: true,
longpoll: false
)

# Serve at "/" the static files from "priv/static" directory.
#
Expand Down
2 changes: 1 addition & 1 deletion server-phoenix/lib/helios_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule HeliosWeb.Router do

forward("/graphiql", Absinthe.Plug.GraphiQL,
schema: HeliosWeb.Schema,
socket: HeliosWeb.UserSocket
socket: HeliosWeb.AbsintheSocketDecorator
)
end

Expand Down
43 changes: 43 additions & 0 deletions server-phoenix/lib/helios_web/schema/types/sub.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
defmodule HeliosWeb.Schema.Types.Sub do
use Absinthe.Schema.Notation
use Agent

def start_link(initial_value) do
Agent.start_link(fn -> initial_value end, name: __MODULE__)
end

def get_args do
Agent.get(__MODULE__, fn state -> state end)
end

def add_args(args) do
Agent.update(__MODULE__, fn state -> state ++ [args] end)
end

def remove_args(args) do
Agent.update(__MODULE__, fn state -> state -- [args] end)
end

object :sub do
field(:event_published, non_null(:event), description: "An event was published to the API") do
Expand All @@ -16,6 +33,32 @@ defmodule HeliosWeb.Schema.Types.Sub do
description("Latest weather data retrieved")
arg(:latitude, non_null(:float))
arg(:longitude, non_null(:float))

config(fn args,
%{
context: %{
"status" => status
}
} ->
IO.puts("Channel Status")
IO.inspect(status)

if(status == "enter") do
IO.puts("subscribed")
add_args(args)
end

if(status == "leave" || status == "terminate") do
IO.puts("unsubscribed")
remove_args(args)
end

{:ok, topic: [args.latitude, args.longitude], context_id: "global"}
end)

resolve(fn event, _, _ ->
{:ok, event}
end)
end

field(:announcement_published, non_null(:announcement),
Expand Down

0 comments on commit a4f1ed5

Please sign in to comment.