Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Subscribe multiple topics with a single subscription #120

Merged
merged 4 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Every entry has a category for which we use the following visual abbreviations:

## Unreleased

- 🎁 Threat Bus now supports subscriptions for multiple topics. The
`zmq-app-plugin` implements those multi-topic subscriptions in a
backwards-compatible way. Subscribers benefit from this change, as they only
get assigned a single point-to-point topic for their subscription, instead of
one point-to-point topic for every subscribed Threat Bus topic.
[#120](https://github.com/tenzir/threatbus/pull/120)

- ⚠️ The `-c` / `--config` parameter is now explicitly required to start
Threat Bus. Starting without it will print a helpful error message.
[#119](https://github.com/tenzir/threatbus/pull/119)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

@dataclass
class Heartbeat:
# the p2p_topic used for point-to-point communication between the host and
# the subscriber, not a human-readable topic. I.e., the random string that
# was sent as respons from the Threat Bus host during subscription.
topic: str


Expand Down
35 changes: 24 additions & 11 deletions plugins/apps/threatbus_zmq_app/threatbus_zmq_app/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,21 @@ def run(self):
subscriptions_lock.acquire()
subscriptions[p2p_topic] = (task.topic, p2p_q)
subscriptions_lock.release()
snapshot_id = self.subscribe_callback(
task.topic, p2p_q, task.snapshot
subscribed_topics = (
task.topic if type(task.topic) is list else [task.topic]
)
if snapshot_id:
# remember that this snapshot was requested by this particular
# subscriber (identified by unique topic), so it is not asked to
# execute it's own request
snapshots_lock.acquire()
snapshots[snapshot_id] = p2p_topic
snapshots_lock.release()

for subscribed_topic in subscribed_topics:
snapshot_id = self.subscribe_callback(
subscribed_topic, p2p_q, task.snapshot
)
if snapshot_id:
# remember that this snapshot was requested by this particular
# subscriber (identified by unique p2p_topic), so it is not asked to
# execute it's own snapshot request
snapshots_lock.acquire()
snapshots[snapshot_id] = p2p_topic
snapshots_lock.release()
# send success message for reconnecting
socket.send_json(
{
Expand All @@ -112,12 +117,20 @@ def run(self):
socket.send_json({"status": "error"})
elif type(task) is Unsubscription:
logger.info(f"Received unsubscription from topic {task.topic}")
threatbus_topic, p2p_q = subscriptions.get(task.topic, (None, None))
threatbus_topics, p2p_q = subscriptions.get(
task.topic, (None, None)
)
threatbus_topics = (
threatbus_topics
if type(threatbus_topics) is list
else [threatbus_topics]
)
if not p2p_q:
logger.warn("No one was subscribed for that topic. Skipping.")
socket.send_json({"status": "error"})
continue
self.unsubscribe_callback(threatbus_topic, p2p_q)
for tb_topic in threatbus_topics:
self.unsubscribe_callback(tb_topic, p2p_q)
subscriptions_lock.acquire()
del subscriptions[task.topic]
subscriptions_lock.release()
Expand Down
17 changes: 8 additions & 9 deletions tests/utils/zmq_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import sys


def send_manage_message(action, topic):
def send_manage_message(action, topic, snapshot=0):
"""
Un/subscribes to Threat Bus for the given topic.
@param action Either 'subscribe' or 'unsubscribe'
@param topic The topic to subscribe to
"""
action = {"action": action, "topic": topic}
action = {"action": action, "topic": topic, "snapshot": snapshot}
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0)
Expand All @@ -36,11 +36,11 @@ def receive(n: int, topics: list):
"""
socket = zmq.Context().socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:13371")
p2p_topics = []
for topic in topics:
res = send_manage_message("subscribe", topic)
socket.setsockopt(zmq.SUBSCRIBE, res["topic"].encode())
p2p_topics.append(res["topic"])
snapshot = 100 # days
res = send_manage_message("subscribe", topics, snapshot)
p2p_topic = res["topic"]
print("p2p topic:", p2p_topic)
socket.setsockopt(zmq.SUBSCRIBE, p2p_topic.encode())
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

Expand All @@ -50,8 +50,7 @@ def receive(n: int, topics: list):
raw = socket.recv()
topic, message = raw.decode("utf-8").split(" ", 1)
yield (topic, message)
for topic in p2p_topics:
send_manage_message("unsubscribe", topic)
send_manage_message("unsubscribe", p2p_topic)


def forward(n: int, topics: list, q: Queue):
Expand Down
8 changes: 6 additions & 2 deletions threatbus/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
from stix2 import Indicator, Sighting, parse
from stix2.parsing import dict_to_stix2
from typing import Union
from typing import List, Union

## Threat Bus custom STIX-2 attributes
@unique
Expand All @@ -23,12 +23,16 @@ class ThreatBusSTIX2Constants(Enum):

@dataclass
class Subscription:
topic: str
# either a single topic or a list of topics, e.g., `stix2/indicator`
topic: Union[str, List[str]]
snapshot: timedelta


@dataclass
class Unsubscription:
# the p2p_topic used for point-to-point communication between the host and
# the subscriber, not a human-readable topic. I.e., the random string that
# was sent as respons from the Threat Bus host during subscription.
topic: str


Expand Down