Skip to content

Commit

Permalink
Merge pull request #816 from Annopaolo/delete-device
Browse files Browse the repository at this point in the history
Support device deletion
  • Loading branch information
rbino authored Oct 6, 2023
2 parents 00bed92 + a47a2a8 commit 01c8510
Show file tree
Hide file tree
Showing 45 changed files with 2,812 additions and 73 deletions.
2 changes: 2 additions & 0 deletions apps/astarte_data_updater_plant/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ config :astarte_data_updater_plant, :amqp_consumer_options,
config :logger, :console,
format: {PrettyLog.UserFriendlyFormatter, :format},
metadata: [:realm, :device_id, :function]

config :astarte_data_updater_plant, :rpc_client, MockRPCClient
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2019 Ispirata Srl
# Copyright 2019 - 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@ defmodule Astarte.DataUpdaterPlant.ConsumersSupervisor do

alias Astarte.DataUpdaterPlant.AMQPDataConsumer
alias Astarte.DataUpdaterPlant.Config
alias Astarte.DataUpdater.DeletionScheduler

def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
Expand All @@ -34,7 +35,8 @@ defmodule Astarte.DataUpdaterPlant.ConsumersSupervisor do
children = [
{Registry, [keys: :unique, name: Registry.AMQPDataConsumer]},
{AMQPDataConsumer.ConnectionManager, amqp_opts: Config.amqp_consumer_options!()},
AMQPDataConsumer.Supervisor
AMQPDataConsumer.Supervisor,
DeletionScheduler
]

opts = [strategy: :rest_for_one, name: __MODULE__]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 Ispirata Srl
# Copyright 2017 - 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -160,6 +160,15 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater do
|> GenServer.call({:dump_state})
end

def start_device_deletion(realm, encoded_device_id, timestamp) do
with :ok <- verify_device_exists(realm, encoded_device_id) do
message_tracker = get_message_tracker(realm, encoded_device_id, offload_start: true)

get_data_updater_process(realm, encoded_device_id, message_tracker, offload_start: true)
|> GenServer.call({:start_device_deletion, timestamp})
end
end

def get_data_updater_process(realm, encoded_device_id, message_tracker, opts \\ []) do
with {:ok, device_id} <- Device.decode_device_id(encoded_device_id) do
case Registry.lookup(Registry.DataUpdater, {realm, device_id}) do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#
# This file is part of Astarte.
#
# Copyright 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
defmodule Astarte.DataUpdater.DeletionScheduler do
@moduledoc """
This module sends messages to start deletion to a
Astarte.DataUpdater.Server. When a deletion notice
is received, the Server will start the device deletion
procedure, write the dup_start_ack to db, and
synchronously acknowledge it to the Scheduler.
"""
use GenServer

alias Astarte.DataUpdaterPlant.DataUpdater.Queries
alias Astarte.DataUpdaterPlant.DataUpdater
alias Astarte.DataUpdaterPlant.Config
alias Astarte.Core.Device

require Logger

# TODO expose this via config
@reconciliation_timeout :timer.minutes(5)
def start_link(_args) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def init(_args) do
# TODO: manually start_device_deletion!() when needed
schedule_next_device_deletion()
{:ok, %{}}
end

def handle_info(:delete_devices, state) do
_ = Logger.debug("Reconciling devices for whom deletion shall begin")

start_device_deletion!()
schedule_next_device_deletion()
{:noreply, state}
end

defp start_device_deletion! do
retrieve_devices_to_delete!()
|> Enum.each(fn %{realm_name: realm_name, encoded_device_id: encoded_device_id} ->
timestamp = now_us_x10_timestamp()
# This must be a call, as we want to be sure this was completed
:ok = DataUpdater.start_device_deletion(realm_name, encoded_device_id, timestamp)
end)
end

defp schedule_next_device_deletion do
Process.send_after(self(), :delete_devices, @reconciliation_timeout)
end

defp retrieve_devices_to_delete! do
realms = Queries.retrieve_realms!()

for %{"realm_name" => realm_name} <- realms,
%{"device_id" => device_id} <-
Queries.retrieve_devices_waiting_to_start_deletion!(realm_name),
encoded_device_id = Device.encode_device_id(device_id),
should_handle_data_from_device?(realm_name, encoded_device_id) do
_ =
Logger.debug("Retrieved device to delete",
tag: "device_to_delete",
realm_name: realm_name,
device_id: encoded_device_id
)

%{realm_name: realm_name, encoded_device_id: encoded_device_id}
end
end

defp should_handle_data_from_device?(realm_name, encoded_device_id) do
# TODO extract a function from Astarte.DataUpdaterPlant.AMQPDataConsumer
# This is the same sharding algorithm used in astarte_vmq_plugin
# Make sure they stay in sync
queue_index =
{realm_name, encoded_device_id}
|> :erlang.phash2(Config.data_queue_total_count!())

queue_index >= Config.data_queue_range_start!() and
queue_index <= Config.data_queue_range_end!()
end

# TODO this is copied from astarte_vmq_plugin
defp now_us_x10_timestamp do
DateTime.utc_now()
|> DateTime.to_unix(:microsecond)
|> Kernel.*(10)
end
end
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 Ispirata Srl
# Copyright 2017 - 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,6 +50,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
@interface_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
@device_triggers_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
@groups_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
@deletion_refresh_lifespan_decimicroseconds 60 * 10 * 1000 * 10000

def init_state(realm, device_id, message_tracker) do
MessageTracker.register_data_updater(message_tracker)
Expand All @@ -74,7 +75,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
last_seen_message: 0,
last_device_triggers_refresh: 0,
last_groups_refresh: 0,
trigger_id_to_policy_name: %{}
trigger_id_to_policy_name: %{},
discard_messages: false,
last_deletion_in_progress_refresh: 0
}

encoded_device_id = Device.encode_device_id(device_id)
Expand All @@ -98,6 +101,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
:ok
end

def handle_connection(%State{discard_messages: true} = state, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

def handle_connection(state, ip_address_string, message_id, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)

Expand Down Expand Up @@ -153,6 +161,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
%{new_state | connected: true, last_seen_message: timestamp}
end

def handle_heartbeat(%State{discard_messages: true} = state, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

# TODO make this private when all heartbeats will be moved to internal
def handle_heartbeat(state, message_id, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)
Expand All @@ -168,7 +181,14 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end

def handle_internal(state, "/heartbeat", _payload, message_id, timestamp) do
handle_heartbeat(state, message_id, timestamp)
{:continue, handle_heartbeat(state, message_id, timestamp)}
end

def handle_internal(%State{discard_messages: true} = state, "/f", _, message_id, _) do
:ok = Queries.ack_end_device_deletion(state.realm, state.device_id)
_ = Logger.info("End device deletion acked.", tag: "device_delete_ack")
MessageTracker.ack_delivery(state.message_tracker, message_id)
{:stop, state}
end

def handle_internal(state, path, payload, message_id, timestamp) do
Expand Down Expand Up @@ -200,7 +220,16 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
timestamp
)

update_stats(new_state, "", nil, path, payload)
{:continue, update_stats(new_state, "", nil, path, payload)}
end

def start_device_deletion(state, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)

# Device deletion is among time-based actions
new_state = execute_time_based_actions(state, timestamp, db_client)

{:ok, new_state}
end

def handle_disconnection(state, message_id, timestamp) do
Expand Down Expand Up @@ -456,6 +485,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
:ok
end

def handle_data(%State{discard_messages: true} = state, _, _, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

def handle_data(state, interface, path, payload, message_id, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)

Expand Down Expand Up @@ -1147,6 +1181,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
}
end

def handle_introspection(%State{discard_messages: true} = state, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

def handle_introspection(state, payload, message_id, timestamp) do
with {:ok, new_introspection_list} <- PayloadsDecoder.parse_introspection(payload) do
process_introspection(state, new_introspection_list, payload, message_id, timestamp)
Expand Down Expand Up @@ -1409,6 +1448,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
}
end

def handle_control(%State{discard_messages: true} = state, _, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

def handle_control(state, "/producer/properties", <<0, 0, 0, 0>>, message_id, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)

Expand Down Expand Up @@ -1575,6 +1619,16 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
update_stats(new_state, "", nil, path, payload)
end

def handle_install_volatile_trigger(
%State{discard_messages: true} = state,
_,
message_id,
_
) do
MessageTracker.ack_delivery(state.message_tracker, message_id)
state
end

def handle_install_volatile_trigger(
state,
object_id,
Expand Down Expand Up @@ -1668,6 +1722,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end
end

def handle_delete_volatile_trigger(%State{discard_messages: true} = state, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

def handle_delete_volatile_trigger(state, trigger_id) do
{new_volatile, maybe_trigger} =
Enum.reduce(state.volatile_triggers, {[], nil}, fn item, {acc, found} ->
Expand Down Expand Up @@ -1836,6 +1895,55 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
|> reload_groups_on_expiry(timestamp, db_client)
|> purge_expired_interfaces(timestamp)
|> reload_device_triggers_on_expiry(timestamp, db_client)
|> reload_device_deletion_status_on_expiry(timestamp, db_client)
end

defp reload_device_deletion_status_on_expiry(state, timestamp, db_client) do
if state.last_deletion_in_progress_refresh + @deletion_refresh_lifespan_decimicroseconds <=
timestamp do
new_state = maybe_start_device_deletion(db_client, state, timestamp)
%State{new_state | last_deletion_in_progress_refresh: timestamp}
else
state
end
end

defp maybe_start_device_deletion(db_client, state, timestamp) do
if should_start_device_deletion?(state.realm, state.device_id) do
encoded_device_id = Device.encode_device_id(state.device_id)

:ok = force_device_deletion_from_broker(state.realm, encoded_device_id)
new_state = set_device_disconnected(state, db_client, timestamp)

_ =
Logger.info("Stop handling data from device in deletion, device_id #{encoded_device_id}")

# It's ok to repeat that, as we always write ⊤
Queries.ack_start_device_deletion(state.realm, state.device_id)

%State{new_state | discard_messages: true}
else
state
end
end

defp should_start_device_deletion?(realm_name, device_id) do
case Queries.check_device_deletion_in_progress(realm_name, device_id) do
{:ok, true} ->
true

{:ok, false} ->
false

{:error, reason} ->
_ =
Logger.warn(
"Cannot check device deletion status for #{inspect(device_id)}, reason #{inspect(reason)}",
tag: "should_start_device_deletion_fail"
)

false
end
end

defp purge_expired_interfaces(state, timestamp) do
Expand Down Expand Up @@ -2166,6 +2274,24 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end
end

defp force_device_deletion_from_broker(realm, encoded_device_id) do
_ = Logger.info("Disconnecting device to be deleted, device_id #{encoded_device_id}")

case VMQPlugin.delete(realm, encoded_device_id) do
# Successfully disconnected
:ok ->
:ok

# Not found means it was already disconnected, succeed anyway
{:error, :not_found} ->
:ok

# Some other error, return it
{:error, reason} ->
{:error, reason}
end
end

defp get_on_data_triggers(state, event, interface_id, endpoint_id) do
key = {event, interface_id, endpoint_id}

Expand Down
Loading

0 comments on commit 01c8510

Please sign in to comment.