diff --git a/CHANGELOG.md b/CHANGELOG.md index 588adbeb..4f8c0528 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,13 @@ Every entry has a category for which we use the following visual abbreviations: ## Unreleased +- 🎁 Threat Bus now supports subscriptions for multiple topics. The + `zmq-app-plugin` implements those multi-topic subscriptions in a + backwards-compatible way. Subscribers benefit from this change, as they only + get assigned a single point-to-point topic for their subscription, instead of + one point-to-point topic for every subscribed Threat Bus topic. + [#120](https://github.com/tenzir/threatbus/pull/120) + - ⚠️ The `-c` / `--config` parameter is now explicitly required to start Threat Bus. Starting without it will print a helpful error message. [#119](https://github.com/tenzir/threatbus/pull/119) diff --git a/plugins/apps/threatbus_zmq_app/threatbus_zmq_app/message_mapping.py b/plugins/apps/threatbus_zmq_app/threatbus_zmq_app/message_mapping.py index 1ad23a1f..f15c178b 100644 --- a/plugins/apps/threatbus_zmq_app/threatbus_zmq_app/message_mapping.py +++ b/plugins/apps/threatbus_zmq_app/threatbus_zmq_app/message_mapping.py @@ -5,6 +5,9 @@ @dataclass class Heartbeat: + # the p2p_topic used for point-to-point communication between the host and + # the subscriber, not a human-readable topic. I.e., the random string that + # was sent as respons from the Threat Bus host during subscription. topic: str diff --git a/plugins/apps/threatbus_zmq_app/threatbus_zmq_app/plugin.py b/plugins/apps/threatbus_zmq_app/threatbus_zmq_app/plugin.py index 0f4c2913..b0db4095 100644 --- a/plugins/apps/threatbus_zmq_app/threatbus_zmq_app/plugin.py +++ b/plugins/apps/threatbus_zmq_app/threatbus_zmq_app/plugin.py @@ -88,16 +88,21 @@ def run(self): subscriptions_lock.acquire() subscriptions[p2p_topic] = (task.topic, p2p_q) subscriptions_lock.release() - snapshot_id = self.subscribe_callback( - task.topic, p2p_q, task.snapshot + subscribed_topics = ( + task.topic if type(task.topic) is list else [task.topic] ) - if snapshot_id: - # remember that this snapshot was requested by this particular - # subscriber (identified by unique topic), so it is not asked to - # execute it's own request - snapshots_lock.acquire() - snapshots[snapshot_id] = p2p_topic - snapshots_lock.release() + + for subscribed_topic in subscribed_topics: + snapshot_id = self.subscribe_callback( + subscribed_topic, p2p_q, task.snapshot + ) + if snapshot_id: + # remember that this snapshot was requested by this particular + # subscriber (identified by unique p2p_topic), so it is not asked to + # execute it's own snapshot request + snapshots_lock.acquire() + snapshots[snapshot_id] = p2p_topic + snapshots_lock.release() # send success message for reconnecting socket.send_json( { @@ -112,12 +117,20 @@ def run(self): socket.send_json({"status": "error"}) elif type(task) is Unsubscription: logger.info(f"Received unsubscription from topic {task.topic}") - threatbus_topic, p2p_q = subscriptions.get(task.topic, (None, None)) + threatbus_topics, p2p_q = subscriptions.get( + task.topic, (None, None) + ) + threatbus_topics = ( + threatbus_topics + if type(threatbus_topics) is list + else [threatbus_topics] + ) if not p2p_q: logger.warn("No one was subscribed for that topic. Skipping.") socket.send_json({"status": "error"}) continue - self.unsubscribe_callback(threatbus_topic, p2p_q) + for tb_topic in threatbus_topics: + self.unsubscribe_callback(tb_topic, p2p_q) subscriptions_lock.acquire() del subscriptions[task.topic] subscriptions_lock.release() diff --git a/tests/utils/zmq_receiver.py b/tests/utils/zmq_receiver.py index 26ccea60..ce6b7097 100644 --- a/tests/utils/zmq_receiver.py +++ b/tests/utils/zmq_receiver.py @@ -5,13 +5,13 @@ import sys -def send_manage_message(action, topic): +def send_manage_message(action, topic, snapshot=0): """ Un/subscribes to Threat Bus for the given topic. @param action Either 'subscribe' or 'unsubscribe' @param topic The topic to subscribe to """ - action = {"action": action, "topic": topic} + action = {"action": action, "topic": topic, "snapshot": snapshot} context = zmq.Context() socket = context.socket(zmq.REQ) socket.setsockopt(zmq.LINGER, 0) @@ -36,11 +36,11 @@ def receive(n: int, topics: list): """ socket = zmq.Context().socket(zmq.SUB) socket.connect("tcp://127.0.0.1:13371") - p2p_topics = [] - for topic in topics: - res = send_manage_message("subscribe", topic) - socket.setsockopt(zmq.SUBSCRIBE, res["topic"].encode()) - p2p_topics.append(res["topic"]) + snapshot = 100 # days + res = send_manage_message("subscribe", topics, snapshot) + p2p_topic = res["topic"] + print("p2p topic:", p2p_topic) + socket.setsockopt(zmq.SUBSCRIBE, p2p_topic.encode()) poller = zmq.Poller() poller.register(socket, zmq.POLLIN) @@ -50,8 +50,7 @@ def receive(n: int, topics: list): raw = socket.recv() topic, message = raw.decode("utf-8").split(" ", 1) yield (topic, message) - for topic in p2p_topics: - send_manage_message("unsubscribe", topic) + send_manage_message("unsubscribe", p2p_topic) def forward(n: int, topics: list, q: Queue): diff --git a/threatbus/data.py b/threatbus/data.py index f605571e..4962b249 100644 --- a/threatbus/data.py +++ b/threatbus/data.py @@ -4,7 +4,7 @@ import json from stix2 import Indicator, Sighting, parse from stix2.parsing import dict_to_stix2 -from typing import Union +from typing import List, Union ## Threat Bus custom STIX-2 attributes @unique @@ -23,12 +23,16 @@ class ThreatBusSTIX2Constants(Enum): @dataclass class Subscription: - topic: str + # either a single topic or a list of topics, e.g., `stix2/indicator` + topic: Union[str, List[str]] snapshot: timedelta @dataclass class Unsubscription: + # the p2p_topic used for point-to-point communication between the host and + # the subscriber, not a human-readable topic. I.e., the random string that + # was sent as respons from the Threat Bus host during subscription. topic: str