Skip to content

Commit

Permalink
Support for message hub 0.7.0
Browse files Browse the repository at this point in the history
Signed-off-by: Simone Orru <simone.orru@secomind.com>
  • Loading branch information
sorru94 committed Nov 12, 2024
1 parent 9d2db03 commit fb79307
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 58 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/check-examples-grpc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ env:
jobs:
check-examples-grpc:
runs-on: ubuntu-latest
strategy:
matrix:
msghub-version: ["v0.7.0"]
concurrency:
group: check-examples-grpc-${{ github.ref }}
cancel-in-progress: true
Expand All @@ -40,7 +43,7 @@ jobs:
with:
repository: astarte-platform/astarte-message-hub
path: astarte-message-hub
ref: v0.6.1
ref: ${{ matrix.msghub-version }}
- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
Expand Down Expand Up @@ -92,7 +95,7 @@ jobs:
uses: actions/cache/restore@v4
with:
path: ./astarte-message-hub/target/release/astarte-message-hub
key: astarte-message-hub-v0.6.1
key: astarte-message-hub-${{ matrix.msghub-version }}
- name: Build message hub
if: steps.cache-astarte-message-hub-restore.outputs.cache-hit != 'true'
working-directory: ./astarte-message-hub
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/e2e-tests-grpc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
strategy:
matrix:
python-version: ["3.12"]
msghub-version: ["v0.7.0"]
concurrency:
group: e2e-test-grpc-${{ matrix.python-version }}-${{ github.ref }}
cancel-in-progress: true
Expand All @@ -42,7 +43,7 @@ jobs:
with:
repository: astarte-platform/astarte-message-hub
path: astarte-message-hub
ref: v0.6.1
ref: ${{ matrix.msghub-version }}
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
Expand Down Expand Up @@ -103,7 +104,7 @@ jobs:
uses: actions/cache/restore@v4
with:
path: ./astarte-message-hub/target/release/astarte-message-hub
key: astarte-message-hub-v0.6.1
key: astarte-message-hub-${{ matrix.msghub-version }}
- name: Build message hub
if: steps.cache-astarte-message-hub-restore.outputs.cache-hit != 'true'
run: cargo build --release
Expand Down
83 changes: 55 additions & 28 deletions astarte/device/device_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@

# pylint: disable=no-name-in-module
import grpc
from astarteplatform.msghub.astarte_message_pb2 import AstarteMessage, AstarteUnset
from astarteplatform.msghub.astarte_message_pb2 import (
AstarteMessage,
AstarteUnset,
MessageHubEvent,
)
from astarteplatform.msghub.astarte_type_pb2 import (
AstarteBinaryBlobArray,
AstarteBooleanArray,
Expand All @@ -41,8 +45,10 @@
AstarteLongIntegerArray,
AstarteStringArray,
)
from astarteplatform.msghub.interface_pb2 import InterfacesJson, InterfacesName
from astarteplatform.msghub.message_hub_service_pb2_grpc import MessageHubStub
from astarteplatform.msghub.node_pb2 import Node
from google.protobuf.empty_pb2 import Empty
from google.protobuf.timestamp_pb2 import Timestamp
from grpc import (
ChannelConnectivity,
Expand Down Expand Up @@ -130,9 +136,11 @@ def add_interface_from_json(self, interface_json: dict):
raise DeviceConnectingError("Interfaces cannot be added while device is connecting.")
interface = Interface(interface_json)
self._introspection.add_interface(interface)
self.__interfaces_bins[interface.name] = json.dumps(interface_json).encode()
interface_bin = json.dumps(interface_json).encode()
self.__interfaces_bins[interface.name] = interface_bin
if self.__connection_state is ConnectionState.CONNECTED:
self._detach_and_attach_node()
interfaces_json = InterfacesJson(interfaces_json=[interface_bin])
self.__msghub_stub.AddInterfaces(interfaces_json)

def remove_interface(self, interface_name: str) -> None:
"""
Expand All @@ -154,19 +162,8 @@ def remove_interface(self, interface_name: str) -> None:
if interface_name in self.__interfaces_bins:
del self.__interfaces_bins[interface_name]
if self.__connection_state is ConnectionState.CONNECTED:
self._detach_and_attach_node()

def _detach_and_attach_node(self):
"""
Detaches the GRPC node and then re-attaches it.
"""
logging.debug("Detaching and re-attaching node with uuid %s.", str(self._node_uuid))
self.__msghub_stub.Detach(self.__msghub_node)
self.__msghub_node = Node(
uuid=self._node_uuid, interface_jsons=list(self.__interfaces_bins.values())
)
stream = self.__msghub_stub.Attach(self.__msghub_node)
self.__stream_queue.put(stream)
interfaces_name = InterfacesName(names=[interface_name.encode()])
self.__msghub_stub.RemoveInterfaces(interfaces_name)

def connect(self) -> None:
"""
Expand All @@ -187,9 +184,7 @@ def connect(self) -> None:
)
self.__msghub_stub = MessageHubStub(self.__grpc_channel)

self.__msghub_node = Node(
uuid=self._node_uuid, interface_jsons=list(self.__interfaces_bins.values())
)
self.__msghub_node = Node(interfaces_json=list(self.__interfaces_bins.values()))
stream = self.__msghub_stub.Attach(self.__msghub_node)

self.__stream_queue.put(stream)
Expand Down Expand Up @@ -246,10 +241,14 @@ def _rx_stream_handler(self):
"""
while True:
stream = self.__stream_queue.get()
if stream is not None:
try:
for astarte_message in stream:
(interface_name, path, payload) = _decode_astarte_message(astarte_message)
if stream is None:
break

try:
for msg_hub_event in stream:
astarte_message = _decode_msg_hub_event(msg_hub_event)
if astarte_message:
(interface_name, path, payload) = astarte_message
logging.debug(
"Received message on interface: %s, endpoint %s, content: %s",
str(interface_name),
Expand All @@ -258,10 +257,8 @@ def _rx_stream_handler(self):
)
if self._on_data_received:
self._on_message_generic(interface_name, path, payload)
except _MultiThreadedRendezvous as exc:
logging.error("Status code change in the GRPC core: %s", str(exc.code()))
else:
break
except _MultiThreadedRendezvous as exc:
logging.error("Status code change in the GRPC core: %s", str(exc.code()))

def disconnect(self) -> None:
"""
Expand All @@ -277,7 +274,7 @@ def disconnect(self) -> None:

if self.__grpc_channel:
self.__stream_queue.put(None)
self.__msghub_stub.Detach(self.__msghub_node)
self.__msghub_stub.Detach(Empty())
self.__grpc_channel.close()

if self._on_disconnected:
Expand Down Expand Up @@ -490,6 +487,36 @@ def _encode_timestamp(timestamp: datetime) -> Timestamp:
return protobuf_timestamp


def _decode_msg_hub_event(
msg_hub_event: MessageHubEvent,
) -> (str, str, object | collections.abc.Mapping | None) | None:
"""
Decode MessageHubEvent object.
Parameters
----------
msg_hub_event : MessageHubEvent
The MessageHubEvent to decode.
Returns
-------
tuple[str, str, object | collections.abc.Mapping | None] | None
A tuple containing:
- The interface name corresponding to the payload
- The path corresponding to the payload
- The decoded payload
Or None if the received event was an error.
"""

payload = None
if msg_hub_event.HasField("error"):
logging.error("Error from the message hub: %s", msg_hub_event.error.description)
else:
payload = _decode_astarte_message(msg_hub_event.message)

return payload


def _decode_astarte_message(
astarte_message: AstarteMessage,
) -> (str, str, object | collections.abc.Mapping | None):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies = [
"cryptography>=42.0.0",
"bson>=0.5.5",
"PyJWT>=1.7.0",
"astarte-message-hub-proto==0.6.2",
"astarte-message-hub-proto==0.7.0",
]
requires-python = ">=3.8"
authors = [
Expand Down
54 changes: 29 additions & 25 deletions tests/test_device_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ def test_devicegrpc_init_calls_parent_init(self, mock_device):
)
mock_device.assert_called_once()

@mock.patch("astarte.device.device_grpc.InterfacesJson")
@mock.patch("astarte.device.device_grpc.json.dumps")
@mock.patch("astarte.device.device_grpc.Interface")
@mock.patch.object(Introspection, "add_interface")
def test_devicegrpc_add_interface_from_json_non_connected_device(
self, mock_add_interface, mock_interface, mock_json_dumps
self, mock_add_interface, mock_interface, mock_json_dumps, mock_interface_json
):
device = DeviceGrpc(
"server address",
Expand All @@ -87,6 +88,7 @@ def test_devicegrpc_add_interface_from_json_non_connected_device(
assert device._DeviceGrpc__interfaces_bins == {
mock_interface.return_value.name: mock_json_dumps.return_value.encode.return_value
}
mock_interface_json.assert_not_called()

@mock.patch("astarte.device.device_grpc.json.dumps")
@mock.patch("astarte.device.device_grpc.Interface")
Expand All @@ -112,12 +114,12 @@ def test_devicegrpc_add_interface_from_json_connecting_device(
mock_json_dumps.assert_not_called()
assert not device._DeviceGrpc__interfaces_bins

@mock.patch("astarte.device.device_grpc.Node")
@mock.patch("astarte.device.device_grpc.InterfacesJson")
@mock.patch("astarte.device.device_grpc.json.dumps")
@mock.patch("astarte.device.device_grpc.Interface")
@mock.patch.object(Introspection, "add_interface")
def test_devicegrpc_add_interface_from_json_connected_device(
self, mock_add_interface, mock_interface, mock_json_dumps, mock_node
self, mock_add_interface, mock_interface, mock_json_dumps, mock_interfaces_json
):
node_uuid = "node uuid"
device = DeviceGrpc(
Expand All @@ -143,11 +145,10 @@ def test_devicegrpc_add_interface_from_json_connected_device(
mock_interface.return_value.name: mock_json_dumps.return_value.encode.return_value
}

mock_device_stub.Detach.assert_called_once_with(mock_device_node)
mock_node.assert_called_once_with(
uuid=node_uuid, interface_jsons=[mock_json_dumps.return_value.encode.return_value]
mock_interfaces_json.assert_called_once_with(
interfaces_json=[mock_json_dumps.return_value.encode.return_value]
)
mock_device_stub.Attach.assert_called_once_with(mock_node.return_value)
mock_device_stub.AddInterfaces.assert_called_once_with(mock_interfaces_json.return_value)

@mock.patch.object(Introspection, "remove_interface")
def test_devicegrpc_remove_interface_non_connected_device(self, mock_remove_interface):
Expand Down Expand Up @@ -205,9 +206,11 @@ def test_devicegrpc_remove_interface_connecting_device(self, mock_remove_interfa
"<other-interface-name2>": other_interface_2,
}

@mock.patch("astarte.device.device_grpc.Node")
@mock.patch("astarte.device.device_grpc.InterfacesName")
@mock.patch.object(Introspection, "remove_interface")
def test_devicegrpc_remove_interface_connected_device(self, mock_remove_interface, mock_node):
def test_devicegrpc_remove_interface_connected_device(
self, mock_remove_interface, mock_interfaces_name
):
node_uuid = "node uuid"
device = DeviceGrpc(
"server address",
Expand Down Expand Up @@ -239,11 +242,8 @@ def test_devicegrpc_remove_interface_connected_device(self, mock_remove_interfac
"<other-interface-name2>": other_interface_2,
}

mock_device_stub.Detach.assert_called_once_with(mock_device_node)
mock_node.assert_called_once_with(
uuid=node_uuid, interface_jsons=[other_interface_1, other_interface_2]
)
mock_device_stub.Attach.assert_called_once_with(mock_node.return_value)
mock_interfaces_name.assert_called_once_with(names=[interface_name.encode()])
mock_device_stub.RemoveInterfaces.assert_called_once_with(mock_interfaces_name.return_value)

@mock.patch("astarte.device.device_grpc.Thread")
@mock.patch("astarte.device.device_grpc.Node")
Expand Down Expand Up @@ -285,7 +285,7 @@ def test_devicegrpc_connect(
mock_AstarteUnaryStreamInterceptor.return_value,
)
mock_msg_hub_stub.assert_called_once_with(mock_intercept_channel.return_value)
mock_node.assert_called_once_with(uuid=node_uuid, interface_jsons=[])
mock_node.assert_called_once_with(interfaces_json=[])
mock_msg_hub_stub.return_value.Attach.assert_called_once_with(mock_node.return_value)

mock_thread.assert_called_once_with(target=device._rx_stream_handler)
Expand Down Expand Up @@ -373,7 +373,7 @@ def test_devicegrpc__on_connectivity_change(
mock_AstarteUnaryStreamInterceptor.return_value,
)
mock_msg_hub_stub.assert_called_once_with(mock_intercept_channel.return_value)
mock_node.assert_called_once_with(uuid=node_uuid, interface_jsons=[])
mock_node.assert_called_once_with(interfaces_json=[])
mock_msg_hub_stub.return_value.Attach.assert_called_once_with(mock_node.return_value)

mock_thread.assert_called_once_with(target=device._rx_stream_handler)
Expand Down Expand Up @@ -477,7 +477,7 @@ def test_devicegrpc__on_connectivity_change_threaded(
mock_AstarteUnaryStreamInterceptor.return_value,
)
mock_msg_hub_stub.assert_called_once_with(mock_intercept_channel.return_value)
mock_node.assert_called_once_with(uuid=node_uuid, interface_jsons=[])
mock_node.assert_called_once_with(interfaces_json=[])
mock_msg_hub_stub.return_value.Attach.assert_called_once_with(mock_node.return_value)

mock_thread.assert_called_once_with(target=device._rx_stream_handler)
Expand Down Expand Up @@ -523,6 +523,7 @@ def test_devicegrpc__on_connectivity_change_threaded(
mock_on_disconnected.assert_not_called()

@mock.patch.object(Device, "_on_message_generic")
@mock.patch("astarte.device.device_grpc._decode_msg_hub_event")
@mock.patch("astarte.device.device_grpc._decode_astarte_message")
@mock.patch("astarte.device.device_grpc.Thread")
@mock.patch("astarte.device.device_grpc.Node")
Expand All @@ -541,6 +542,7 @@ def test_devicegrpc__rx_stream_handler(
mock_node,
mock_thread,
mock__decode_astarte_message,
mock__decode_msg_hub_event,
mock__on_message_generic,
):
server_address = "server address"
Expand Down Expand Up @@ -594,7 +596,7 @@ def __next__(self):
mock_AstarteUnaryStreamInterceptor.return_value,
)
mock_msg_hub_stub.assert_called_once_with(mock_intercept_channel.return_value)
mock_node.assert_called_once_with(uuid=node_uuid, interface_jsons=[])
mock_node.assert_called_once_with(interfaces_json=[])
mock_msg_hub_stub.return_value.Attach.assert_called_once_with(mock_node.return_value)

mock_thread.assert_called_once_with(target=device._rx_stream_handler)
Expand All @@ -613,12 +615,12 @@ def __next__(self):
# Manually start of the _rx_stream_handler() method in the current thread
rx_msg1_decoded = ("interface 1 name", "path 1", mock.MagicMock())
rx_msg2_decoded = ("interface 2 name", "path 2", mock.MagicMock())
mock__decode_astarte_message.side_effect = [rx_msg1_decoded, rx_msg2_decoded]
mock__decode_msg_hub_event.side_effect = [rx_msg1_decoded, rx_msg2_decoded]
device._rx_stream_handler()

calls = [mock.call(rx_message1), mock.call(rx_message2)]
mock__decode_astarte_message.assert_has_calls(calls)
self.assertEqual(mock__decode_astarte_message.call_count, 2)
mock__decode_msg_hub_event.assert_has_calls(calls)
self.assertEqual(mock__decode_msg_hub_event.call_count, 2)

calls = [
mock.call(rx_msg1_decoded[0], rx_msg1_decoded[1], rx_msg1_decoded[2]),
Expand All @@ -627,7 +629,8 @@ def __next__(self):
mock__on_message_generic.assert_has_calls(calls)
self.assertEqual(mock__on_message_generic.call_count, 2)

def test_devicegrpc_disconnect(self):
@mock.patch("astarte.device.device_grpc.Empty")
def test_devicegrpc_disconnect(self, mock_empty):
server_address = "server address"
node_uuid = "node uuid"
device = DeviceGrpc(
Expand Down Expand Up @@ -655,11 +658,12 @@ def test_devicegrpc_disconnect(self):

device.disconnect()

mock_msghub_stub.Detach.assert_called_once_with(mock_msghub_node)
mock_msghub_stub.Detach.assert_called_once_with(mock_empty.return_value)
mock_grpc_channel.close.assert_called_once()
mock_on_disconnected.assert_called_once_with(device, 0)

def test_devicegrpc_disconnect_threaded(self):
@mock.patch("astarte.device.device_grpc.Empty")
def test_devicegrpc_disconnect_threaded(self, mock_empty):
server_address = "server address"
node_uuid = "node uuid"
device = DeviceGrpc(
Expand Down Expand Up @@ -689,7 +693,7 @@ def test_devicegrpc_disconnect_threaded(self):

device.disconnect()

mock_msghub_stub.Detach.assert_called_once_with(mock_msghub_node)
mock_msghub_stub.Detach.assert_called_once_with(mock_empty.return_value)
mock_grpc_channel.close.assert_called_once()
mock_loop.call_soon_threadsafe.assert_called_once_with(mock_on_disconnected, device, 0)
mock_on_disconnected.assert_not_called()
Expand Down

0 comments on commit fb79307

Please sign in to comment.