Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
Merge pull request #120
Browse files Browse the repository at this point in the history
Subscribe multiple topics with a single subscription
  • Loading branch information
0snap authored May 18, 2021
2 parents f141466 + cf14a7a commit 67ffbac
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 22 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Every entry has a category for which we use the following visual abbreviations:

## Unreleased

- 🎁 Threat Bus now supports subscriptions for multiple topics. The
`zmq-app-plugin` implements those multi-topic subscriptions in a
backwards-compatible way. Subscribers benefit from this change, as they only
get assigned a single point-to-point topic for their subscription, instead of
one point-to-point topic for every subscribed Threat Bus topic.
[#120](https://github.com/tenzir/threatbus/pull/120)

- ⚠️ The `-c` / `--config` parameter is now explicitly required to start
Threat Bus. Starting without it will print a helpful error message.
[#119](https://github.com/tenzir/threatbus/pull/119)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

@dataclass
class Heartbeat:
# the p2p_topic used for point-to-point communication between the host and
# the subscriber, not a human-readable topic. I.e., the random string that
# was sent as respons from the Threat Bus host during subscription.
topic: str


Expand Down
35 changes: 24 additions & 11 deletions plugins/apps/threatbus_zmq_app/threatbus_zmq_app/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,21 @@ def run(self):
subscriptions_lock.acquire()
subscriptions[p2p_topic] = (task.topic, p2p_q)
subscriptions_lock.release()
snapshot_id = self.subscribe_callback(
task.topic, p2p_q, task.snapshot
subscribed_topics = (
task.topic if type(task.topic) is list else [task.topic]
)
if snapshot_id:
# remember that this snapshot was requested by this particular
# subscriber (identified by unique topic), so it is not asked to
# execute it's own request
snapshots_lock.acquire()
snapshots[snapshot_id] = p2p_topic
snapshots_lock.release()

for subscribed_topic in subscribed_topics:
snapshot_id = self.subscribe_callback(
subscribed_topic, p2p_q, task.snapshot
)
if snapshot_id:
# remember that this snapshot was requested by this particular
# subscriber (identified by unique p2p_topic), so it is not asked to
# execute it's own snapshot request
snapshots_lock.acquire()
snapshots[snapshot_id] = p2p_topic
snapshots_lock.release()
# send success message for reconnecting
socket.send_json(
{
Expand All @@ -112,12 +117,20 @@ def run(self):
socket.send_json({"status": "error"})
elif type(task) is Unsubscription:
logger.info(f"Received unsubscription from topic {task.topic}")
threatbus_topic, p2p_q = subscriptions.get(task.topic, (None, None))
threatbus_topics, p2p_q = subscriptions.get(
task.topic, (None, None)
)
threatbus_topics = (
threatbus_topics
if type(threatbus_topics) is list
else [threatbus_topics]
)
if not p2p_q:
logger.warn("No one was subscribed for that topic. Skipping.")
socket.send_json({"status": "error"})
continue
self.unsubscribe_callback(threatbus_topic, p2p_q)
for tb_topic in threatbus_topics:
self.unsubscribe_callback(tb_topic, p2p_q)
subscriptions_lock.acquire()
del subscriptions[task.topic]
subscriptions_lock.release()
Expand Down
17 changes: 8 additions & 9 deletions tests/utils/zmq_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import sys


def send_manage_message(action, topic):
def send_manage_message(action, topic, snapshot=0):
"""
Un/subscribes to Threat Bus for the given topic.
@param action Either 'subscribe' or 'unsubscribe'
@param topic The topic to subscribe to
"""
action = {"action": action, "topic": topic}
action = {"action": action, "topic": topic, "snapshot": snapshot}
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0)
Expand All @@ -36,11 +36,11 @@ def receive(n: int, topics: list):
"""
socket = zmq.Context().socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:13371")
p2p_topics = []
for topic in topics:
res = send_manage_message("subscribe", topic)
socket.setsockopt(zmq.SUBSCRIBE, res["topic"].encode())
p2p_topics.append(res["topic"])
snapshot = 100 # days
res = send_manage_message("subscribe", topics, snapshot)
p2p_topic = res["topic"]
print("p2p topic:", p2p_topic)
socket.setsockopt(zmq.SUBSCRIBE, p2p_topic.encode())
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

Expand All @@ -50,8 +50,7 @@ def receive(n: int, topics: list):
raw = socket.recv()
topic, message = raw.decode("utf-8").split(" ", 1)
yield (topic, message)
for topic in p2p_topics:
send_manage_message("unsubscribe", topic)
send_manage_message("unsubscribe", p2p_topic)


def forward(n: int, topics: list, q: Queue):
Expand Down
8 changes: 6 additions & 2 deletions threatbus/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
from stix2 import Indicator, Sighting, parse
from stix2.parsing import dict_to_stix2
from typing import Union
from typing import List, Union

## Threat Bus custom STIX-2 attributes
@unique
Expand All @@ -23,12 +23,16 @@ class ThreatBusSTIX2Constants(Enum):

@dataclass
class Subscription:
topic: str
# either a single topic or a list of topics, e.g., `stix2/indicator`
topic: Union[str, List[str]]
snapshot: timedelta


@dataclass
class Unsubscription:
# the p2p_topic used for point-to-point communication between the host and
# the subscriber, not a human-readable topic. I.e., the random string that
# was sent as respons from the Threat Bus host during subscription.
topic: str


Expand Down

0 comments on commit 67ffbac

Please sign in to comment.