Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
Use new logic to subscribe to multiple topics
Browse files Browse the repository at this point in the history
  • Loading branch information
0snap committed May 18, 2021
1 parent 6e69693 commit 6c677c5
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions tests/utils/zmq_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import sys


def send_manage_message(action, topic):
def send_manage_message(action, topic, snapshot=0):
"""
Un/subscribes to Threat Bus for the given topic.
@param action Either 'subscribe' or 'unsubscribe'
@param topic The topic to subscribe to
"""
action = {"action": action, "topic": topic}
action = {"action": action, "topic": topic, "snapshot": snapshot}
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0)
Expand All @@ -36,11 +36,11 @@ def receive(n: int, topics: list):
"""
socket = zmq.Context().socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:13371")
p2p_topics = []
for topic in topics:
res = send_manage_message("subscribe", topic)
socket.setsockopt(zmq.SUBSCRIBE, res["topic"].encode())
p2p_topics.append(res["topic"])
snapshot = 100 # days
res = send_manage_message("subscribe", topics, snapshot)
p2p_topic = res["topic"]
print("p2p topic:", p2p_topic)
socket.setsockopt(zmq.SUBSCRIBE, p2p_topic.encode())
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

Expand All @@ -50,8 +50,7 @@ def receive(n: int, topics: list):
raw = socket.recv()
topic, message = raw.decode("utf-8").split(" ", 1)
yield (topic, message)
for topic in p2p_topics:
send_manage_message("unsubscribe", topic)
send_manage_message("unsubscribe", p2p_topic)


def forward(n: int, topics: list, q: Queue):
Expand Down

0 comments on commit 6c677c5

Please sign in to comment.