From d2a4704649e982f3bc4b60bfd9153fc0c6a96f7c Mon Sep 17 00:00:00 2001 From: Rajan Date: Wed, 27 Mar 2024 18:14:39 -0400 Subject: [PATCH] [CAP] [Feature] Get list of actors from directory service. (#2073) * Search directory for list of actors using regex '.*' gets all actors * docs changes * pre-commit fixes * Use ActorInfo from protobuf * pre-commit * Added zmq tests to work on removing sleeps * minor refactor of zmq tests * 1) Change DirSvr to user Broker. 2) Add req-router to broker 3) In ActorConnector use handshake and req/resp to remove sleep * 1) Change DirSvr to user Broker. 2) Add req-router to broker 3) In ActorConnector use handshake and req/resp to remove sleep * move socket creation to thread with recv * move socket creation to thread with recv * Better logging for DirectorySvc * better logging for directory svc * Use logging config * Start removing sleeps * pre-commit * Cleanup monitor socket --- samples/apps/cap/README.md | 5 +- .../apps/cap/py/autogencap/ActorConnector.py | 47 ++++-- samples/apps/cap/py/autogencap/Broker.py | 35 ++++- samples/apps/cap/py/autogencap/Config.py | 2 + samples/apps/cap/py/autogencap/DebugLog.py | 12 +- .../apps/cap/py/autogencap/DirectorySvc.py | 98 ++++++------ .../cap/py/autogencap/LocalActorNetwork.py | 14 +- samples/apps/cap/py/demo/App.py | 11 +- samples/apps/cap/py/demo/SimpleActorDemo.py | 2 - samples/apps/cap/py/demo/list_agents.py | 29 ++++ samples/apps/cap/py/demo/zmq_tests.py | 145 ++++++++++++++++++ 11 files changed, 319 insertions(+), 81 deletions(-) create mode 100644 samples/apps/cap/py/demo/list_agents.py create mode 100644 samples/apps/cap/py/demo/zmq_tests.py diff --git a/samples/apps/cap/README.md b/samples/apps/cap/README.md index db93821203e..7fe7a6c7469 100644 --- a/samples/apps/cap/README.md +++ b/samples/apps/cap/README.md @@ -49,6 +49,7 @@ Actors can register themselves with CAP, find other agents, construct arbitrary 2) Many CAP Actors interacting with each other 3) A pair of interacting AutoGen Agents wrapped in CAP Actors 4) CAP wrapped AutoGen Agents in a group chat - +5) Two AutoGen Agents running in different processes and communicating through CAP +6) List all registered agents in CAP ### Coming soon. Stay tuned! ### -1) Two AutoGen Agents running in different processes and communicating through CAP +1) AutoGen integration to list all registered agents diff --git a/samples/apps/cap/py/autogencap/ActorConnector.py b/samples/apps/cap/py/autogencap/ActorConnector.py index f48f9f8e84b..cbd55a1da0a 100644 --- a/samples/apps/cap/py/autogencap/ActorConnector.py +++ b/samples/apps/cap/py/autogencap/ActorConnector.py @@ -2,27 +2,57 @@ # socket that can publish to that topic. It exposes this functionality # using send_msg method import zmq +from zmq.utils.monitor import recv_monitor_message import time import uuid from .DebugLog import Debug, Error -from .Config import xsub_url, xpub_url +from .Config import xsub_url, xpub_url, router_url +from typing import Any, Dict class ActorConnector: def __init__(self, context, topic): - self._pub_socket = context.socket(zmq.PUB) - self._pub_socket.setsockopt(zmq.LINGER, 0) - self._pub_socket.connect(xsub_url) + self._context = context - self._resp_socket = context.socket(zmq.SUB) + self._resp_socket = self._context.socket(zmq.SUB) self._resp_socket.setsockopt(zmq.LINGER, 0) - self._resp_socket.setsockopt(zmq.RCVTIMEO, 10000) + self._resp_socket.setsockopt(zmq.RCVTIMEO, 250) self._resp_socket.connect(xpub_url) self._resp_topic = str(uuid.uuid4()) Debug("AgentConnector", f"subscribe to: {self._resp_topic}") self._resp_socket.setsockopt_string(zmq.SUBSCRIBE, f"{self._resp_topic}") self._topic = topic - time.sleep(0.01) # Let the network do things. + + self._connect_pub_socket() + + def _send_recv_router_msg(self): + # Send a request to the router and wait for a response + req_socket = self._context.socket(zmq.REQ) + req_socket.connect(router_url) + try: + Debug("ActorConnector", "Broker Check Request Sent") + req_socket.send_string("Request") + _ = req_socket.recv_string() + Debug("ActorConnector", "Broker Check Response Received") + finally: + req_socket.close() + + def _connect_pub_socket(self): + self._pub_socket = self._context.socket(zmq.PUB) + self._pub_socket.setsockopt(zmq.LINGER, 0) + monitor = self._pub_socket.get_monitor_socket() + self._pub_socket.connect(xsub_url) + # Monitor handshake on the pub socket + while monitor.poll(): + evt: Dict[str, Any] = {} + mon_evt = recv_monitor_message(monitor) + evt.update(mon_evt) + if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED: + Debug("ActorConnector", "Handshake received (Or Monitor stopped)") + break + self._pub_socket.disable_monitor() + monitor.close() + self._send_recv_router_msg() def send_txt_msg(self, msg): self._pub_socket.send_multipart( @@ -35,14 +65,11 @@ def send_bin_msg(self, msg_type: str, msg): ) def binary_request(self, msg_type: str, msg, retry=5): - time.sleep(0.5) # Let the network do things. self._pub_socket.send_multipart( [self._topic.encode("utf8"), msg_type.encode("utf8"), self._resp_topic.encode("utf8"), msg] ) - time.sleep(0.5) # Let the network do things. for i in range(retry + 1): try: - self._resp_socket.setsockopt(zmq.RCVTIMEO, 10000) resp_topic, resp_msg_type, resp_sender_topic, resp = self._resp_socket.recv_multipart() return resp_topic, resp_msg_type, resp_sender_topic, resp except zmq.Again: diff --git a/samples/apps/cap/py/autogencap/Broker.py b/samples/apps/cap/py/autogencap/Broker.py index 0bb844d2a04..8f276519335 100644 --- a/samples/apps/cap/py/autogencap/Broker.py +++ b/samples/apps/cap/py/autogencap/Broker.py @@ -2,7 +2,7 @@ import zmq import threading from autogencap.DebugLog import Debug, Info, Warn -from autogencap.Config import xsub_url, xpub_url +from autogencap.Config import xsub_url, xpub_url, router_url class Broker: @@ -11,19 +11,23 @@ def __init__(self, context: zmq.Context = zmq.Context()): self._run: bool = False self._xpub: zmq.Socket = None self._xsub: zmq.Socket = None + self._router: zmq.Socket = None - def start(self) -> bool: + def _init_sockets(self): try: # XPUB setup self._xpub = self._context.socket(zmq.XPUB) self._xpub.setsockopt(zmq.LINGER, 0) self._xpub.bind(xpub_url) - # XSUB setup self._xsub = self._context.socket(zmq.XSUB) self._xsub.setsockopt(zmq.LINGER, 0) self._xsub.bind(xsub_url) - + # ROUTER setup + self._router = self._context.socket(zmq.ROUTER) + self._router.setsockopt(zmq.LINGER, 0) + self._router.bind(router_url) + return True except zmq.ZMQError as e: Debug("BROKER", f"Unable to start. Check details: {e}") # If binding fails, close the sockets and return False @@ -31,8 +35,12 @@ def start(self) -> bool: self._xpub.close() if self._xsub: self._xsub.close() + if self._router: + self._router.close() return False + def start(self) -> bool: + Debug("BROKER", "Trying to start broker.") self._run = True self._broker_thread: threading.Thread = threading.Thread(target=self.thread_fn) self._broker_thread.start() @@ -40,6 +48,8 @@ def start(self) -> bool: return True def stop(self): + if not self._run: + return # Error("BROKER_ERR", "fix cleanup self._context.term()") Debug("BROKER", "stopped") self._run = False @@ -48,15 +58,24 @@ def stop(self): self._xpub.close() if self._xsub: self._xsub.close() + if self._router: + self._router.close() # self._context.term() def thread_fn(self): try: + if not self._init_sockets(): + Debug("BROKER", "Receive thread not started since sockets were not initialized") + self._run = False + return + # Poll sockets for events self._poller: zmq.Poller = zmq.Poller() self._poller.register(self._xpub, zmq.POLLIN) self._poller.register(self._xsub, zmq.POLLIN) + self._poller.register(self._router, zmq.POLLIN) + Info("BROKER", "Started. Waiting for events") # Receive msgs, forward and process while self._run: events = dict(self._poller.poll(500)) @@ -70,6 +89,14 @@ def thread_fn(self): Debug("BROKER", f"publishing message: {message}") self._xpub.send_multipart(message) + if self._router in events: + message = self._router.recv_multipart() + Debug("BROKER", f"router message: {message}") + # Mirror it back for now to confirm connectivity + # More interesting reserved point to point + # routing coming in the the future + self._router.send_multipart(message) + except Exception as e: Debug("BROKER", f"thread encountered an error: {e}") finally: diff --git a/samples/apps/cap/py/autogencap/Config.py b/samples/apps/cap/py/autogencap/Config.py index 299e4e273b8..5584a8d29cb 100644 --- a/samples/apps/cap/py/autogencap/Config.py +++ b/samples/apps/cap/py/autogencap/Config.py @@ -3,3 +3,5 @@ IGNORED_LOG_CONTEXTS = [] xpub_url: str = "tcp://127.0.0.1:5555" xsub_url: str = "tcp://127.0.0.1:5556" +router_url: str = "tcp://127.0.0.1:5557" +dealer_url: str = "tcp://127.0.0.1:5558" diff --git a/samples/apps/cap/py/autogencap/DebugLog.py b/samples/apps/cap/py/autogencap/DebugLog.py index b31512f7ffa..b86a497c658 100644 --- a/samples/apps/cap/py/autogencap/DebugLog.py +++ b/samples/apps/cap/py/autogencap/DebugLog.py @@ -1,6 +1,6 @@ import threading import datetime -from autogencap.Config import LOG_LEVEL, IGNORED_LOG_CONTEXTS +import autogencap.Config as Config from termcolor import colored # Define log levels as constants @@ -18,19 +18,21 @@ def Log(level, context, msg): # Check if the current level meets the threshold - if level >= LOG_LEVEL: # Use the LOG_LEVEL from the Config module + if level >= Config.LOG_LEVEL: # Use the LOG_LEVEL from the Config module # Check if the context is in the list of ignored contexts - if context in IGNORED_LOG_CONTEXTS: + if context in Config.IGNORED_LOG_CONTEXTS: return with console_lock: timestamp = colored(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S"), "dark_grey") # Translate level number to name and color level_name = colored(LEVEL_NAMES[level], LEVEL_COLOR[level]) - # Center justify the context and color it blue + # Left justify the context and color it blue context = colored(context.ljust(14), "blue") + # Left justify the threadid and color it blue + thread_id = colored(str(threading.get_ident()).ljust(5), "blue") # color the msg based on the level msg = colored(msg, LEVEL_COLOR[level]) - print(f"{threading.get_ident()} {timestamp} {level_name}: [{context}] {msg}") + print(f"{thread_id} {timestamp} {level_name}: [{context}] {msg}") def Debug(context, message): diff --git a/samples/apps/cap/py/autogencap/DirectorySvc.py b/samples/apps/cap/py/autogencap/DirectorySvc.py index 21d01c71207..87833c91ae3 100644 --- a/samples/apps/cap/py/autogencap/DirectorySvc.py +++ b/samples/apps/cap/py/autogencap/DirectorySvc.py @@ -1,12 +1,22 @@ from autogencap.Constants import Directory_Svc_Topic -from autogencap.Config import xpub_url, xsub_url +from autogencap.Config import xpub_url, xsub_url, router_url from autogencap.DebugLog import Debug, Info, Error from autogencap.ActorConnector import ActorConnector from autogencap.Actor import Actor -from autogencap.proto.CAP_pb2 import ActorRegistration, ActorInfo, ActorLookup, ActorLookupResponse, Ping, Pong +from autogencap.Broker import Broker +from autogencap.proto.CAP_pb2 import ( + ActorRegistration, + ActorInfo, + ActorLookup, + ActorLookupResponse, + Ping, + Pong, + ActorInfoCollection, +) import zmq import threading import time +import re # TODO (Future DirectorySv PR) use actor description, network_id, other properties to make directory # service more generic and powerful @@ -52,18 +62,27 @@ def _actor_lookup_msg_handler(self, topic: str, msg_type: str, msg: bytes, sende actor_lookup = ActorLookup() actor_lookup.ParseFromString(msg) Debug("DirectorySvc", f"Actor lookup: {actor_lookup.actor_info.name}") - actor: ActorInfo = None - if actor_lookup.actor_info.name in self._registered_actors: - Info("DirectorySvc", f"Actor found: {actor_lookup.actor_info.name}") - actor = self._registered_actors[actor_lookup.actor_info.name] - else: - Error("DirectorySvc", f"Actor not found: {actor_lookup.actor_info.name}") actor_lookup_resp = ActorLookupResponse() - if actor is not None: - actor_lookup_resp.actor.info_coll.extend([actor]) + actor_lookup_resp.found = False + try: + pattern = re.compile(actor_lookup.actor_info.name) + except re.error: + Error("DirectorySvc", f"Invalid regex pattern: {actor_lookup.actor_info.name}") + else: + found_actor_list = [ + self._registered_actors[registered_actor] + for registered_actor in self._registered_actors + if pattern.match(registered_actor) + ] + + if found_actor_list: + for actor in found_actor_list: + Info("DirectorySvc", f"Actor found: {actor.name}") actor_lookup_resp.found = True + actor_lookup_resp.actor.info_coll.extend(found_actor_list) else: - actor_lookup_resp.found = False + Error("DirectorySvc", f"Actor not found: {actor_lookup.actor_info.name}") + sender_connection = ActorConnector(self._context, sender_topic) serialized_msg = actor_lookup_resp.SerializeToString() sender_connection.send_bin_msg(ActorLookupResponse.__name__, serialized_msg) @@ -76,6 +95,7 @@ def __init__(self, context: zmq.Context = zmq.Context()): self._directory_actor: DirectoryActor = None def _no_other_directory(self) -> bool: + Debug("DirectorySvc", "Pinging existing DirectorySvc") ping = Ping() serialized_msg = ping.SerializeToString() _, _, _, resp = self._directory_connector.binary_request(Ping.__name__, serialized_msg, retry=0) @@ -110,69 +130,39 @@ def register_actor_by_name(self, actor_name: str): actor_info = ActorInfo(name=actor_name) self.register_actor(actor_info) - def lookup_actor_by_name(self, actor_name: str) -> ActorInfo: - actor_info = ActorInfo(name=actor_name) + def _lookup_actors_by_name(self, name_regex: str): + actor_info = ActorInfo(name=name_regex) actor_lookup = ActorLookup(actor_info=actor_info) serialized_msg = actor_lookup.SerializeToString() _, _, _, resp = self._directory_connector.binary_request(ActorLookup.__name__, serialized_msg) actor_lookup_resp = ActorLookupResponse() actor_lookup_resp.ParseFromString(resp) + return actor_lookup_resp + + def lookup_actor_by_name(self, actor_name: str) -> ActorInfo: + actor_lookup_resp = self._lookup_actors_by_name(actor_name) if actor_lookup_resp.found: if len(actor_lookup_resp.actor.info_coll) > 0: return actor_lookup_resp.actor.info_coll[0] return None - -# Standalone min proxy for a standalone directory service -class MinProxy: - def __init__(self, context: zmq.Context): - self._context: zmq.Context = context - self._xpub: zmq.Socket = None - self._xsub: zmq.Socket = None - - def start(self): - # Start the proxy thread - proxy_thread = threading.Thread(target=self.proxy_thread_fn) - proxy_thread.start() - time.sleep(0.01) - - def stop(self): - self._xsub.setsockopt(zmq.LINGER, 0) - self._xpub.setsockopt(zmq.LINGER, 0) - self._xpub.close() - self._xsub.close() - time.sleep(0.01) - - def proxy_thread_fn(self): - self._xpub: zmq.Socket = self._context.socket(zmq.XPUB) - self._xsub: zmq.Socket = self._context.socket(zmq.XSUB) - try: - self._xpub.bind(xpub_url) - self._xsub.bind(xsub_url) - zmq.proxy(self._xpub, self._xsub) - except zmq.ContextTerminated: - self._xpub.close() - self._xsub.close() - except Exception as e: - Error("proxy_thread_fn", f"proxy_thread_fn encountered an error: {e}") - self._xpub.setsockopt(zmq.LINGER, 0) - self._xsub.setsockopt(zmq.LINGER, 0) - self._xpub.close() - self._xsub.close() - finally: - Info("proxy_thread_fn", "proxy_thread_fn terminated.") + def lookup_actor_info_by_name(self, actor_name: str) -> ActorInfoCollection: + actor_lookup_resp = self._lookup_actors_by_name(actor_name) + if actor_lookup_resp.found: + if len(actor_lookup_resp.actor.info_coll) > 0: + return actor_lookup_resp.actor + return None # Run a standalone directory service def main(): context: zmq.Context = zmq.Context() # Start simple broker (will exit if real broker is running) - proxy: MinProxy = MinProxy(context) + proxy: Broker = Broker(context) proxy.start() # Start the directory service directory_svc = DirectorySvc(context) directory_svc.start() - # # How do you register an actor? # directory_svc.register_actor_by_name("my_actor") # diff --git a/samples/apps/cap/py/autogencap/LocalActorNetwork.py b/samples/apps/cap/py/autogencap/LocalActorNetwork.py index d1133bdb7e7..685972119ff 100644 --- a/samples/apps/cap/py/autogencap/LocalActorNetwork.py +++ b/samples/apps/cap/py/autogencap/LocalActorNetwork.py @@ -5,7 +5,8 @@ from .DirectorySvc import DirectorySvc from .Constants import Termination_Topic from .Actor import Actor -from .proto.CAP_pb2 import ActorInfo +from .proto.CAP_pb2 import ActorInfo, ActorInfoCollection +from typing import List import time # TODO: remove time import @@ -69,3 +70,14 @@ def lookup_actor(self, name: str) -> ActorConnector: def lookup_termination(self) -> ActorConnector: termination_topic: str = Termination_Topic return self.actor_connector_by_topic(termination_topic) + + def lookup_actor_info(self, name_regex) -> List[ActorInfo]: + actor_info: ActorInfoCollection = self._directory_svc.lookup_actor_info_by_name(name_regex) + if actor_info is None: + Warn("Local_Actor_Network", f"{name_regex}, not found in the network.") + return None + Debug("Local_Actor_Network", f"[{name_regex}] found in the network.") + actor_list = [] + for actor in actor_info.info_coll: + actor_list.append(actor) + return actor_list diff --git a/samples/apps/cap/py/demo/App.py b/samples/apps/cap/py/demo/App.py index e9a49ba4a26..a952a27c311 100644 --- a/samples/apps/cap/py/demo/App.py +++ b/samples/apps/cap/py/demo/App.py @@ -4,7 +4,7 @@ import argparse import _paths -from autogencap.Config import LOG_LEVEL, IGNORED_LOG_CONTEXTS +import autogencap.Config as Config import autogencap.DebugLog as DebugLog from SimpleActorDemo import simple_actor_demo from AGDemo import ag_demo @@ -13,6 +13,7 @@ from CAPAutoGenPairDemo import cap_ag_pair_demo from ComplexActorDemo import complex_actor_demo from RemoteAGDemo import remote_ag_demo +from list_agents import list_agents #################################################################################################### @@ -26,7 +27,8 @@ def parse_args(): # Set the log level # Print log level string based on names in debug_log.py print(f"Log level: {DebugLog.LEVEL_NAMES[args.log_level]}") - # IGNORED_LOG_CONTEXTS.extend(["BROKER"]) + Config.LOG_LEVEL = args.log_level + # Config.IGNORED_LOG_CONTEXTS.extend(["BROKER"]) #################################################################################################### @@ -42,7 +44,8 @@ def main(): print("3. AutoGen Pair") print("4. AutoGen GroupChat") print("5. AutoGen Agents in different processes") - choice = input("Enter your choice (1-5): ") + print("6. List Actors in CAP") + choice = input("Enter your choice (1-6): ") if choice == "1": simple_actor_demo() @@ -58,6 +61,8 @@ def main(): cap_ag_group_demo() elif choice == "5": remote_ag_demo() + elif choice == "6": + list_agents() else: print("Quitting...") break diff --git a/samples/apps/cap/py/demo/SimpleActorDemo.py b/samples/apps/cap/py/demo/SimpleActorDemo.py index 26a67814d31..a7031a4bbe0 100644 --- a/samples/apps/cap/py/demo/SimpleActorDemo.py +++ b/samples/apps/cap/py/demo/SimpleActorDemo.py @@ -17,10 +17,8 @@ def simple_actor_demo(): network.connect() # Get a channel to the actor greeter_link = network.lookup_actor("Greeter") - time.sleep(1) # Send a message to the actor greeter_link.send_txt_msg("Hello World!") - time.sleep(1) # Cleanup greeter_link.close() network.disconnect() diff --git a/samples/apps/cap/py/demo/list_agents.py b/samples/apps/cap/py/demo/list_agents.py new file mode 100644 index 00000000000..2535678e047 --- /dev/null +++ b/samples/apps/cap/py/demo/list_agents.py @@ -0,0 +1,29 @@ +import time +from typing import List +from AppAgents import GreeterAgent, FidelityAgent +from autogencap.LocalActorNetwork import LocalActorNetwork +from autogencap.proto.CAP_pb2 import ActorInfo + + +def list_agents(): + """ + Demonstrates the usage of the CAP platform by registering an actor, connecting to the actor, + sending a message, and performing cleanup operations. + """ + # CAP Platform + + network = LocalActorNetwork() + # Register an actor + network.register(GreeterAgent()) + # Register an actor + network.register(FidelityAgent()) + # Tell actor to connect to other actors + network.connect() + # Get a list of actors + actor_infos: List[ActorInfo] = network.lookup_actor_info(name_regex=".*") + # Print out all actors found + for actor_info in actor_infos: + print(f"Name: {actor_info.name}, Namespace: {actor_info.namespace}, Description: {actor_info.description}") + time.sleep(1) + # Cleanup + network.disconnect() diff --git a/samples/apps/cap/py/demo/zmq_tests.py b/samples/apps/cap/py/demo/zmq_tests.py new file mode 100644 index 00000000000..b53adb00c48 --- /dev/null +++ b/samples/apps/cap/py/demo/zmq_tests.py @@ -0,0 +1,145 @@ +import _paths +import sys +import time +from typing import Any, Dict +import zmq +from zmq.utils.monitor import recv_monitor_message +from autogencap.Config import xsub_url, xpub_url, router_url, dealer_url + + +def zmq_sub_test(): + context = zmq.Context() + sub_socket = context.socket(zmq.SUB) + sub_socket.setsockopt(zmq.LINGER, 0) + sub_socket.setsockopt(zmq.RCVTIMEO, 10000) + sub_socket.connect(xpub_url) + sub_socket.setsockopt_string(zmq.SUBSCRIBE, "") + start_time = time.time() + while True: + try: + msg = sub_socket.recv_string() + print(f"Received: {msg}") + start_time = time.time() + except KeyboardInterrupt: + print("Interrupted by user. Exiting...") + break + except zmq.Again: + elapsed_time = time.time() - start_time + if elapsed_time > 60000: # in seconds + break + print(f"No message received in {elapsed_time:.2f} seconds") + sub_socket.close() + + +def event_monitor(pub_socket: zmq.Socket) -> None: + monitor = pub_socket.get_monitor_socket() + while monitor.poll(): + evt: Dict[str, Any] = {} + mon_evt = recv_monitor_message(monitor) + evt.update(mon_evt) + print(evt) + if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED: + break + monitor.close() + + +def zmq_pub_test(): + context = zmq.Context() + pub_socket = context.socket(zmq.PUB) + pub_socket.setsockopt(zmq.XPUB_VERBOSE, 1) + pub_socket.setsockopt(zmq.LINGER, 0) + pub_socket.connect(xsub_url) + event_monitor(pub_socket) + zmq_req_test(context) + for i in range(1, 11): + pub_socket.send_string(str(i)) + pub_socket.close() + + +def zmq_router_dealer_test(): + context = zmq.Context() + router_socket = context.socket(zmq.ROUTER) + router_socket.bind(router_url) + dealer_socket = context.socket(zmq.DEALER) + dealer_socket.bind(dealer_url) + try: + # Poll sockets for events + poller: zmq.Poller = zmq.Poller() + poller.register(router_socket, zmq.POLLIN) + poller.register(dealer_socket, zmq.POLLIN) + + print("Running...") + # Receive msgs, forward and process + start_time = time.time() + last_time = start_time + while True: + events = dict(poller.poll(500)) + since_last_time = time.time() - last_time + if since_last_time > 60: # in seconds + elapsed_time = time.time() - start_time + print(f"Elapsed time: {elapsed_time:.2f} seconds") + last_time = time.time() + + if router_socket in events: + message = router_socket.recv_multipart() + print("BROKER", f"subscription message: {message[0]}") + dealer_socket.send_multipart(message) + + if dealer_socket in events: + message = dealer_socket.recv_multipart() + print("BROKER", f"publishing message: {message[0]}") + router_socket.send_multipart(message) + + except Exception as e: + print("BROKER", f"thread encountered an error: {e}") + finally: + print("BROKER", "thread ended") + return + + +def zmq_req_test(context: zmq.Context = None): + if context is None: + context = zmq.Context() + req_socket = context.socket(zmq.REQ) + req_socket.connect(router_url) + try: + req_socket.send_string("Request ") + message = req_socket.recv_string() + print(f"Received: {message}") + except KeyboardInterrupt: + print("Interrupted by user. Exiting...") + finally: + req_socket.close() + + +def zmq_rep_test(): + context = zmq.Context() + rep_socket = context.socket(zmq.REP) + rep_socket.connect(dealer_url) + try: + while True: + message = rep_socket.recv_string() + print(f"Received: {message}") + rep_socket.send_string("Acknowledged: " + message) + except KeyboardInterrupt: + print("Interrupted by user. Exiting...") + finally: + rep_socket.close() + + +if __name__ == "__main__": + if len(sys.argv) > 1: + if sys.argv[1] == "pub": + zmq_pub_test() + elif sys.argv[1] == "sub": + zmq_sub_test() + elif sys.argv[1] == "router": + zmq_router_dealer_test() + elif sys.argv[1] == "req": + zmq_req_test() + elif sys.argv[1] == "rep": + zmq_rep_test() + else: + print("Invalid argument. Please use 'pub', 'sub' 'router', 'req', 'rep'") + else: + print("Please provide an argument. Please use 'pub', 'sub' 'router', 'req', 'rep'")