From f8417af9b5735645ff52fd7856fcc946cecbda75 Mon Sep 17 00:00:00 2001 From: Rajan Date: Mon, 18 Mar 2024 17:51:47 -0400 Subject: [PATCH 01/17] Search directory for list of actors using regex '.*' gets all actors --- .../apps/cap/py/autogencap/DirectorySvc.py | 53 ++++++++++++++----- .../cap/py/autogencap/LocalActorNetwork.py | 14 ++++- samples/apps/cap/py/demo/App.py | 6 ++- samples/apps/cap/py/demo/list_agents.py | 30 +++++++++++ 4 files changed, 89 insertions(+), 14 deletions(-) create mode 100644 samples/apps/cap/py/demo/list_agents.py diff --git a/samples/apps/cap/py/autogencap/DirectorySvc.py b/samples/apps/cap/py/autogencap/DirectorySvc.py index 21d01c71207..da2e5d0f3df 100644 --- a/samples/apps/cap/py/autogencap/DirectorySvc.py +++ b/samples/apps/cap/py/autogencap/DirectorySvc.py @@ -3,10 +3,19 @@ from autogencap.DebugLog import Debug, Info, Error from autogencap.ActorConnector import ActorConnector from autogencap.Actor import Actor -from autogencap.proto.CAP_pb2 import ActorRegistration, ActorInfo, ActorLookup, ActorLookupResponse, Ping, Pong +from autogencap.proto.CAP_pb2 import ( + ActorRegistration, + ActorInfo, + ActorLookup, + ActorLookupResponse, + Ping, + Pong, + ActorInfoCollection, +) import zmq import threading import time +import re # TODO (Future DirectorySv PR) use actor description, network_id, other properties to make directory # service more generic and powerful @@ -52,18 +61,27 @@ def _actor_lookup_msg_handler(self, topic: str, msg_type: str, msg: bytes, sende actor_lookup = ActorLookup() actor_lookup.ParseFromString(msg) Debug("DirectorySvc", f"Actor lookup: {actor_lookup.actor_info.name}") - actor: ActorInfo = None - if actor_lookup.actor_info.name in self._registered_actors: - Info("DirectorySvc", f"Actor found: {actor_lookup.actor_info.name}") - actor = self._registered_actors[actor_lookup.actor_info.name] - else: - Error("DirectorySvc", f"Actor not found: {actor_lookup.actor_info.name}") actor_lookup_resp = ActorLookupResponse() - if actor is not None: - actor_lookup_resp.actor.info_coll.extend([actor]) + actor_lookup_resp.found = False + try: + pattern = re.compile(actor_lookup.actor_info.name) + except re.error: + Error("DirectorySvc", f"Invalid regex pattern: {actor_lookup.actor_info.name}") + else: + found_actor_list = [ + self._registered_actors[registered_actor] + for registered_actor in self._registered_actors + if pattern.match(registered_actor) + ] + + if found_actor_list: + for actor in found_actor_list: + Info("DirectorySvc", f"Actor found: {actor}") actor_lookup_resp.found = True + actor_lookup_resp.actor.info_coll.extend(found_actor_list) else: - actor_lookup_resp.found = False + Error("DirectorySvc", f"Actor not found: {actor_lookup.actor_info.name}") + sender_connection = ActorConnector(self._context, sender_topic) serialized_msg = actor_lookup_resp.SerializeToString() sender_connection.send_bin_msg(ActorLookupResponse.__name__, serialized_msg) @@ -110,18 +128,29 @@ def register_actor_by_name(self, actor_name: str): actor_info = ActorInfo(name=actor_name) self.register_actor(actor_info) - def lookup_actor_by_name(self, actor_name: str) -> ActorInfo: - actor_info = ActorInfo(name=actor_name) + def _lookup_actors_by_name(self, name_regex: str): + actor_info = ActorInfo(name=name_regex) actor_lookup = ActorLookup(actor_info=actor_info) serialized_msg = actor_lookup.SerializeToString() _, _, _, resp = self._directory_connector.binary_request(ActorLookup.__name__, serialized_msg) actor_lookup_resp = ActorLookupResponse() actor_lookup_resp.ParseFromString(resp) + return actor_lookup_resp + + def lookup_actor_by_name(self, actor_name: str) -> ActorInfo: + actor_lookup_resp = self._lookup_actors_by_name(actor_name) if actor_lookup_resp.found: if len(actor_lookup_resp.actor.info_coll) > 0: return actor_lookup_resp.actor.info_coll[0] return None + def lookup_actor_info_by_name(self, actor_name: str) -> ActorInfoCollection: + actor_lookup_resp = self._lookup_actors_by_name(actor_name) + if actor_lookup_resp.found: + if len(actor_lookup_resp.actor.info_coll) > 0: + return actor_lookup_resp.actor + return None + # Standalone min proxy for a standalone directory service class MinProxy: diff --git a/samples/apps/cap/py/autogencap/LocalActorNetwork.py b/samples/apps/cap/py/autogencap/LocalActorNetwork.py index d1133bdb7e7..685972119ff 100644 --- a/samples/apps/cap/py/autogencap/LocalActorNetwork.py +++ b/samples/apps/cap/py/autogencap/LocalActorNetwork.py @@ -5,7 +5,8 @@ from .DirectorySvc import DirectorySvc from .Constants import Termination_Topic from .Actor import Actor -from .proto.CAP_pb2 import ActorInfo +from .proto.CAP_pb2 import ActorInfo, ActorInfoCollection +from typing import List import time # TODO: remove time import @@ -69,3 +70,14 @@ def lookup_actor(self, name: str) -> ActorConnector: def lookup_termination(self) -> ActorConnector: termination_topic: str = Termination_Topic return self.actor_connector_by_topic(termination_topic) + + def lookup_actor_info(self, name_regex) -> List[ActorInfo]: + actor_info: ActorInfoCollection = self._directory_svc.lookup_actor_info_by_name(name_regex) + if actor_info is None: + Warn("Local_Actor_Network", f"{name_regex}, not found in the network.") + return None + Debug("Local_Actor_Network", f"[{name_regex}] found in the network.") + actor_list = [] + for actor in actor_info.info_coll: + actor_list.append(actor) + return actor_list diff --git a/samples/apps/cap/py/demo/App.py b/samples/apps/cap/py/demo/App.py index 45dbeb224d7..4a4e9498129 100644 --- a/samples/apps/cap/py/demo/App.py +++ b/samples/apps/cap/py/demo/App.py @@ -12,6 +12,7 @@ from CAPAutoGenPairDemo import cap_ag_pair_demo from ComplexActorDemo import complex_actor_demo from RemoteAGDemo import remote_ag_demo +from list_agents import list_agents #################################################################################################### @@ -41,7 +42,8 @@ def main(): print("3. AutoGen Pair") print("4. AutoGen GroupChat") print("5. AutoGen Agents in different processes") - choice = input("Enter your choice (1-5): ") + print("6. List Actors in CAP") + choice = input("Enter your choice (1-6): ") if choice == "1": simple_actor_demo() @@ -57,6 +59,8 @@ def main(): cap_ag_group_demo() elif choice == "5": remote_ag_demo() + elif choice == "6": + list_agents() else: print("Quitting...") break diff --git a/samples/apps/cap/py/demo/list_agents.py b/samples/apps/cap/py/demo/list_agents.py new file mode 100644 index 00000000000..4cf3310a36b --- /dev/null +++ b/samples/apps/cap/py/demo/list_agents.py @@ -0,0 +1,30 @@ +import time +from typing import List +from AppAgents import GreeterAgent, FidelityAgent +from autogencap.LocalActorNetwork import LocalActorNetwork + +class ActorInfo: + pass + +def list_agents(): + """ + Demonstrates the usage of the CAP platform by registering an actor, connecting to the actor, + sending a message, and performing cleanup operations. + """ + # CAP Platform + + network = LocalActorNetwork() + # Register an actor + network.register(GreeterAgent()) + # Register an actor + network.register(FidelityAgent()) + # Tell actor to connect to other actors + network.connect() + # Get a list of actors + actor_infos: List[ActorInfo] = network.lookup_actor_info(name_regex=".*") + # Print out all actors found + for actor_info in actor_infos: + print(f"Name: {actor_info.name}, Namespace: {actor_info.namespace}, Description: {actor_info.description}") + time.sleep(1) + # Cleanup + network.disconnect() From 3b7d95a7828e87912be1104cd058cf5c56143f1d Mon Sep 17 00:00:00 2001 From: Rajan Date: Mon, 18 Mar 2024 21:03:07 -0400 Subject: [PATCH 02/17] docs changes --- samples/apps/cap/README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/samples/apps/cap/README.md b/samples/apps/cap/README.md index db93821203e..7fe7a6c7469 100644 --- a/samples/apps/cap/README.md +++ b/samples/apps/cap/README.md @@ -49,6 +49,7 @@ Actors can register themselves with CAP, find other agents, construct arbitrary 2) Many CAP Actors interacting with each other 3) A pair of interacting AutoGen Agents wrapped in CAP Actors 4) CAP wrapped AutoGen Agents in a group chat - +5) Two AutoGen Agents running in different processes and communicating through CAP +6) List all registered agents in CAP ### Coming soon. Stay tuned! ### -1) Two AutoGen Agents running in different processes and communicating through CAP +1) AutoGen integration to list all registered agents From a6cd5acc15e8aeff1ecc7c77d623f8f41e5a30be Mon Sep 17 00:00:00 2001 From: Rajan Date: Mon, 18 Mar 2024 21:57:11 -0400 Subject: [PATCH 03/17] pre-commit fixes --- samples/apps/cap/py/demo/list_agents.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/samples/apps/cap/py/demo/list_agents.py b/samples/apps/cap/py/demo/list_agents.py index 4cf3310a36b..dc24047304e 100644 --- a/samples/apps/cap/py/demo/list_agents.py +++ b/samples/apps/cap/py/demo/list_agents.py @@ -3,9 +3,11 @@ from AppAgents import GreeterAgent, FidelityAgent from autogencap.LocalActorNetwork import LocalActorNetwork + class ActorInfo: pass + def list_agents(): """ Demonstrates the usage of the CAP platform by registering an actor, connecting to the actor, From 3055b78fd3a169d3088c9384c194d74d9f89dc37 Mon Sep 17 00:00:00 2001 From: Rajan Date: Mon, 18 Mar 2024 22:22:19 -0400 Subject: [PATCH 04/17] Use ActorInfo from protobuf --- samples/apps/cap/py/demo/list_agents.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/samples/apps/cap/py/demo/list_agents.py b/samples/apps/cap/py/demo/list_agents.py index dc24047304e..91083d7ba69 100644 --- a/samples/apps/cap/py/demo/list_agents.py +++ b/samples/apps/cap/py/demo/list_agents.py @@ -2,11 +2,7 @@ from typing import List from AppAgents import GreeterAgent, FidelityAgent from autogencap.LocalActorNetwork import LocalActorNetwork - - -class ActorInfo: - pass - +from autogencap.proto.CAP_pb2 import ActorInfo def list_agents(): """ From bed19d7dc24633c5b90d98d365b2b26f2e764491 Mon Sep 17 00:00:00 2001 From: Rajan Date: Tue, 19 Mar 2024 15:32:49 -0400 Subject: [PATCH 05/17] pre-commit --- samples/apps/cap/py/demo/list_agents.py | 1 + 1 file changed, 1 insertion(+) diff --git a/samples/apps/cap/py/demo/list_agents.py b/samples/apps/cap/py/demo/list_agents.py index 91083d7ba69..2535678e047 100644 --- a/samples/apps/cap/py/demo/list_agents.py +++ b/samples/apps/cap/py/demo/list_agents.py @@ -4,6 +4,7 @@ from autogencap.LocalActorNetwork import LocalActorNetwork from autogencap.proto.CAP_pb2 import ActorInfo + def list_agents(): """ Demonstrates the usage of the CAP platform by registering an actor, connecting to the actor, From 603804e8bc1063f459ffc4929aa1a5f0514a0612 Mon Sep 17 00:00:00 2001 From: Rajan Date: Mon, 25 Mar 2024 15:31:00 -0400 Subject: [PATCH 06/17] Added zmq tests to work on removing sleeps --- samples/apps/cap/py/autogencap/Config.py | 2 + samples/apps/cap/py/demo/zmq_tests.py | 137 +++++++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 samples/apps/cap/py/demo/zmq_tests.py diff --git a/samples/apps/cap/py/autogencap/Config.py b/samples/apps/cap/py/autogencap/Config.py index 299e4e273b8..5584a8d29cb 100644 --- a/samples/apps/cap/py/autogencap/Config.py +++ b/samples/apps/cap/py/autogencap/Config.py @@ -3,3 +3,5 @@ IGNORED_LOG_CONTEXTS = [] xpub_url: str = "tcp://127.0.0.1:5555" xsub_url: str = "tcp://127.0.0.1:5556" +router_url: str = "tcp://127.0.0.1:5557" +dealer_url: str = "tcp://127.0.0.1:5558" diff --git a/samples/apps/cap/py/demo/zmq_tests.py b/samples/apps/cap/py/demo/zmq_tests.py new file mode 100644 index 00000000000..f747805cf61 --- /dev/null +++ b/samples/apps/cap/py/demo/zmq_tests.py @@ -0,0 +1,137 @@ +import _paths +import sys +import time +from typing import Any, Dict +import zmq +from zmq.utils.monitor import recv_monitor_message +from autogencap.Config import xsub_url, xpub_url, router_url, dealer_url + +def zmq_sub_test(): + context = zmq.Context() + sub_socket = context.socket(zmq.SUB) + sub_socket.setsockopt(zmq.LINGER, 0) + sub_socket.setsockopt(zmq.RCVTIMEO, 10000) + sub_socket.connect(xpub_url) + sub_socket.setsockopt_string(zmq.SUBSCRIBE, "") + start_time = time.time() + while True: + try: + msg = sub_socket.recv_string() + print(f"Received: {msg}") + start_time = time.time() + except KeyboardInterrupt: + print("Interrupted by user. Exiting...") + break + except zmq.Again: + elapsed_time = time.time() - start_time + if elapsed_time > 60000: # in seconds + break + print(f"No message received in {elapsed_time:.2f} seconds") + sub_socket.close() + +def event_monitor(monitor: zmq.Socket) -> None: + while monitor.poll(): + evt: Dict[str, Any] = {} + mon_evt = recv_monitor_message(monitor) + evt.update(mon_evt) + print(evt) + if evt['event'] == zmq.EVENT_MONITOR_STOPPED or evt['event'] == zmq.EVENT_HANDSHAKE_SUCCEEDED: + break + monitor.close() + +def zmq_pub_test(): + context = zmq.Context() + pub_socket = context.socket(zmq.PUB) + pub_socket.setsockopt(zmq.XPUB_VERBOSE, 1) + pub_socket.setsockopt(zmq.LINGER, 0) + pub_socket.connect(xsub_url) + monitor = pub_socket.get_monitor_socket() + event_monitor(monitor) + time.sleep(0.1) + for i in range(1, 11): + pub_socket.send_string(str(i)) + pub_socket.close() + +def zmq_router_dealer_test(): + context = zmq.Context() + router_socket = context.socket(zmq.ROUTER) + router_socket.bind(router_url) + dealer_socket = context.socket(zmq.DEALER) + dealer_socket.bind(dealer_url) + try: + # Poll sockets for events + poller: zmq.Poller = zmq.Poller() + poller.register(router_socket, zmq.POLLIN) + poller.register(dealer_socket, zmq.POLLIN) + + print("Running...") + # Receive msgs, forward and process + start_time = time.time() + last_time = start_time + while True: + events = dict(poller.poll(500)) + since_last_time = time.time() - last_time + if since_last_time > 60: # in seconds + elapsed_time = time.time() - start_time + print(f"Elapsed time: {elapsed_time:.2f} seconds") + last_time = time.time() + + if router_socket in events: + message = router_socket.recv_multipart() + print("BROKER", f"subscription message: {message[0]}") + dealer_socket.send_multipart(message) + + if dealer_socket in events: + message = dealer_socket.recv_multipart() + print("BROKER", f"publishing message: {message[0]}") + router_socket.send_multipart(message) + + except Exception as e: + print("BROKER", f"thread encountered an error: {e}") + finally: + print("BROKER", "thread ended") + return + +def zmq_req_test(): + context = zmq.Context() + req_socket = context.socket(zmq.REQ) + req_socket.connect(router_url) + try: + req_socket.send_string("Request ") + message = req_socket.recv_string() + print(f"Received: {message}") + except KeyboardInterrupt: + print("Interrupted by user. Exiting...") + finally: + req_socket.close() + +def zmq_rep_test(): + context = zmq.Context() + rep_socket = context.socket(zmq.REP) + rep_socket.connect(dealer_url) + try: + while True: + message = rep_socket.recv_string() + print(f"Received: {message}") + rep_socket.send_string("Acknowledged: " + message) + except KeyboardInterrupt: + print("Interrupted by user. Exiting...") + finally: + rep_socket.close() + +if __name__ == "__main__": + if len(sys.argv) > 1: + if sys.argv[1] == 'pub': + zmq_pub_test() + elif sys.argv[1] == 'sub': + zmq_sub_test() + elif sys.argv[1] == 'router': + zmq_router_dealer_test() + elif sys.argv[1] == 'req': + zmq_req_test() + elif sys.argv[1] == 'rep': + zmq_rep_test() + else: + print("Invalid argument. Please use 'pub', 'sub' 'router', 'req', 'rep'") + else: + print("Please provide an argument. Please use 'pub', 'sub' 'router', 'req', 'rep'") \ No newline at end of file From 16c7b93d5cfc2aedaf2b5c5c677a44110df11445 Mon Sep 17 00:00:00 2001 From: Rajan Date: Mon, 25 Mar 2024 22:41:12 -0400 Subject: [PATCH 07/17] minor refactor of zmq tests --- samples/apps/cap/py/demo/zmq_tests.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/samples/apps/cap/py/demo/zmq_tests.py b/samples/apps/cap/py/demo/zmq_tests.py index f747805cf61..a03e36b2790 100644 --- a/samples/apps/cap/py/demo/zmq_tests.py +++ b/samples/apps/cap/py/demo/zmq_tests.py @@ -29,7 +29,8 @@ def zmq_sub_test(): print(f"No message received in {elapsed_time:.2f} seconds") sub_socket.close() -def event_monitor(monitor: zmq.Socket) -> None: +def event_monitor(pub_socket: zmq.Socket) -> None: + monitor = pub_socket.get_monitor_socket() while monitor.poll(): evt: Dict[str, Any] = {} mon_evt = recv_monitor_message(monitor) @@ -45,9 +46,8 @@ def zmq_pub_test(): pub_socket.setsockopt(zmq.XPUB_VERBOSE, 1) pub_socket.setsockopt(zmq.LINGER, 0) pub_socket.connect(xsub_url) - monitor = pub_socket.get_monitor_socket() - event_monitor(monitor) - time.sleep(0.1) + event_monitor(pub_socket) + zmq_req_test(context) for i in range(1, 11): pub_socket.send_string(str(i)) pub_socket.close() @@ -92,8 +92,9 @@ def zmq_router_dealer_test(): print("BROKER", "thread ended") return -def zmq_req_test(): - context = zmq.Context() +def zmq_req_test(context: zmq.Context = None): + if context is None: + context = zmq.Context() req_socket = context.socket(zmq.REQ) req_socket.connect(router_url) try: From cd35cb1f90413f410754fd80b881cfccdb6d5cd0 Mon Sep 17 00:00:00 2001 From: Rajan Date: Mon, 25 Mar 2024 22:43:30 -0400 Subject: [PATCH 08/17] 1) Change DirSvr to user Broker. 2) Add req-router to broker 3) In ActorConnector use handshake and req/resp to remove sleep --- .../apps/cap/py/autogencap/DirectorySvc.py | 50 ++----------------- 1 file changed, 3 insertions(+), 47 deletions(-) diff --git a/samples/apps/cap/py/autogencap/DirectorySvc.py b/samples/apps/cap/py/autogencap/DirectorySvc.py index da2e5d0f3df..c7f0ceabc2f 100644 --- a/samples/apps/cap/py/autogencap/DirectorySvc.py +++ b/samples/apps/cap/py/autogencap/DirectorySvc.py @@ -1,8 +1,9 @@ from autogencap.Constants import Directory_Svc_Topic -from autogencap.Config import xpub_url, xsub_url +from autogencap.Config import xpub_url, xsub_url, router_url from autogencap.DebugLog import Debug, Info, Error from autogencap.ActorConnector import ActorConnector from autogencap.Actor import Actor +from autogencap.Broker import Broker from autogencap.proto.CAP_pb2 import ( ActorRegistration, ActorInfo, @@ -20,7 +21,6 @@ # TODO (Future DirectorySv PR) use actor description, network_id, other properties to make directory # service more generic and powerful - class DirectoryActor(Actor): def __init__(self, topic: str, name: str): super().__init__(topic, name) @@ -86,7 +86,6 @@ def _actor_lookup_msg_handler(self, topic: str, msg_type: str, msg: bytes, sende serialized_msg = actor_lookup_resp.SerializeToString() sender_connection.send_bin_msg(ActorLookupResponse.__name__, serialized_msg) - class DirectorySvc: def __init__(self, context: zmq.Context = zmq.Context()): self._context: zmq.Context = context @@ -151,57 +150,15 @@ def lookup_actor_info_by_name(self, actor_name: str) -> ActorInfoCollection: return actor_lookup_resp.actor return None - -# Standalone min proxy for a standalone directory service -class MinProxy: - def __init__(self, context: zmq.Context): - self._context: zmq.Context = context - self._xpub: zmq.Socket = None - self._xsub: zmq.Socket = None - - def start(self): - # Start the proxy thread - proxy_thread = threading.Thread(target=self.proxy_thread_fn) - proxy_thread.start() - time.sleep(0.01) - - def stop(self): - self._xsub.setsockopt(zmq.LINGER, 0) - self._xpub.setsockopt(zmq.LINGER, 0) - self._xpub.close() - self._xsub.close() - time.sleep(0.01) - - def proxy_thread_fn(self): - self._xpub: zmq.Socket = self._context.socket(zmq.XPUB) - self._xsub: zmq.Socket = self._context.socket(zmq.XSUB) - try: - self._xpub.bind(xpub_url) - self._xsub.bind(xsub_url) - zmq.proxy(self._xpub, self._xsub) - except zmq.ContextTerminated: - self._xpub.close() - self._xsub.close() - except Exception as e: - Error("proxy_thread_fn", f"proxy_thread_fn encountered an error: {e}") - self._xpub.setsockopt(zmq.LINGER, 0) - self._xsub.setsockopt(zmq.LINGER, 0) - self._xpub.close() - self._xsub.close() - finally: - Info("proxy_thread_fn", "proxy_thread_fn terminated.") - - # Run a standalone directory service def main(): context: zmq.Context = zmq.Context() # Start simple broker (will exit if real broker is running) - proxy: MinProxy = MinProxy(context) + proxy: Broker = Broker(context) proxy.start() # Start the directory service directory_svc = DirectorySvc(context) directory_svc.start() - # # How do you register an actor? # directory_svc.register_actor_by_name("my_actor") # @@ -234,6 +191,5 @@ def main(): context.term() Info("main", "Done.") - if __name__ == "__main__": main() From 5b2b727cd335fedcfe38051e63b58edca8b96abc Mon Sep 17 00:00:00 2001 From: Rajan Date: Mon, 25 Mar 2024 22:43:55 -0400 Subject: [PATCH 09/17] 1) Change DirSvr to user Broker. 2) Add req-router to broker 3) In ActorConnector use handshake and req/resp to remove sleep --- .../apps/cap/py/autogencap/ActorConnector.py | 42 +++++++++++++++---- samples/apps/cap/py/autogencap/Broker.py | 30 +++++++++---- 2 files changed, 56 insertions(+), 16 deletions(-) diff --git a/samples/apps/cap/py/autogencap/ActorConnector.py b/samples/apps/cap/py/autogencap/ActorConnector.py index f48f9f8e84b..27455d19e7c 100644 --- a/samples/apps/cap/py/autogencap/ActorConnector.py +++ b/samples/apps/cap/py/autogencap/ActorConnector.py @@ -2,27 +2,53 @@ # socket that can publish to that topic. It exposes this functionality # using send_msg method import zmq +from zmq.utils.monitor import recv_monitor_message import time import uuid from .DebugLog import Debug, Error -from .Config import xsub_url, xpub_url - +from .Config import xsub_url, xpub_url, router_url +from typing import Any, Dict class ActorConnector: def __init__(self, context, topic): - self._pub_socket = context.socket(zmq.PUB) - self._pub_socket.setsockopt(zmq.LINGER, 0) - self._pub_socket.connect(xsub_url) + self._context = context - self._resp_socket = context.socket(zmq.SUB) + self._resp_socket = self._context.socket(zmq.SUB) self._resp_socket.setsockopt(zmq.LINGER, 0) - self._resp_socket.setsockopt(zmq.RCVTIMEO, 10000) + self._resp_socket.setsockopt(zmq.RCVTIMEO, 250) self._resp_socket.connect(xpub_url) self._resp_topic = str(uuid.uuid4()) Debug("AgentConnector", f"subscribe to: {self._resp_topic}") self._resp_socket.setsockopt_string(zmq.SUBSCRIBE, f"{self._resp_topic}") self._topic = topic - time.sleep(0.01) # Let the network do things. + + self._pub_socket = self._context.socket(zmq.PUB) + self._pub_socket.setsockopt(zmq.LINGER, 0) + self._pub_socket.connect(xsub_url) + self._handshake() + + def _send_recv_router_msg(self): + # Send a request to the router and wait for a response + req_socket = self._context.socket(zmq.REQ) + req_socket.connect(router_url) + try: + req_socket.send_string("Request") + _ = req_socket.recv_string() + finally: + req_socket.close() + + def _handshake(self): + # Monitor handshake on the pub socket + monitor = self._pub_socket.get_monitor_socket() + while monitor.poll(): + evt: Dict[str, Any] = {} + mon_evt = recv_monitor_message(monitor) + evt.update(mon_evt) + print(evt) + if evt['event'] == zmq.EVENT_MONITOR_STOPPED or evt['event'] == zmq.EVENT_HANDSHAKE_SUCCEEDED: + break + monitor.close() + self._send_recv_router_msg() def send_txt_msg(self, msg): self._pub_socket.send_multipart( diff --git a/samples/apps/cap/py/autogencap/Broker.py b/samples/apps/cap/py/autogencap/Broker.py index 0bb844d2a04..35caf344f5b 100644 --- a/samples/apps/cap/py/autogencap/Broker.py +++ b/samples/apps/cap/py/autogencap/Broker.py @@ -2,8 +2,7 @@ import zmq import threading from autogencap.DebugLog import Debug, Info, Warn -from autogencap.Config import xsub_url, xpub_url - +from autogencap.Config import xsub_url, xpub_url, router_url class Broker: def __init__(self, context: zmq.Context = zmq.Context()): @@ -11,6 +10,7 @@ def __init__(self, context: zmq.Context = zmq.Context()): self._run: bool = False self._xpub: zmq.Socket = None self._xsub: zmq.Socket = None + self._router: zmq.Socket = None def start(self) -> bool: try: @@ -18,11 +18,14 @@ def start(self) -> bool: self._xpub = self._context.socket(zmq.XPUB) self._xpub.setsockopt(zmq.LINGER, 0) self._xpub.bind(xpub_url) - # XSUB setup self._xsub = self._context.socket(zmq.XSUB) self._xsub.setsockopt(zmq.LINGER, 0) self._xsub.bind(xsub_url) + # ROUTER setup + self._router = self._context.socket(zmq.ROUTER) + self._router.setsockopt(zmq.LINGER, 0) + self._router.bind(router_url) except zmq.ZMQError as e: Debug("BROKER", f"Unable to start. Check details: {e}") @@ -31,8 +34,9 @@ def start(self) -> bool: self._xpub.close() if self._xsub: self._xsub.close() + if self._router: + self._router.close() return False - self._run = True self._broker_thread: threading.Thread = threading.Thread(target=self.thread_fn) self._broker_thread.start() @@ -40,6 +44,7 @@ def start(self) -> bool: return True def stop(self): + if not self._run: return # Error("BROKER_ERR", "fix cleanup self._context.term()") Debug("BROKER", "stopped") self._run = False @@ -48,6 +53,8 @@ def stop(self): self._xpub.close() if self._xsub: self._xsub.close() + if self._router: + self._router.close() # self._context.term() def thread_fn(self): @@ -56,6 +63,7 @@ def thread_fn(self): self._poller: zmq.Poller = zmq.Poller() self._poller.register(self._xpub, zmq.POLLIN) self._poller.register(self._xsub, zmq.POLLIN) + self._poller.register(self._router, zmq.POLLIN) # Receive msgs, forward and process while self._run: @@ -64,12 +72,20 @@ def thread_fn(self): message = self._xpub.recv_multipart() Debug("BROKER", f"subscription message: {message[0]}") self._xsub.send_multipart(message) - + if self._xsub in events: message = self._xsub.recv_multipart() Debug("BROKER", f"publishing message: {message}") self._xpub.send_multipart(message) - + + if self._router in events: + message = self._router.recv_multipart() + Debug("BROKER", f"router message: {message}") + # Mirror it back for now to confirm connectivity + # More interesting reserved point to point + # routing coming in the the future + self._router.send_multipart(message) + except Exception as e: Debug("BROKER", f"thread encountered an error: {e}") finally: @@ -77,7 +93,6 @@ def thread_fn(self): Debug("BROKER", "thread ended") return - # Run a standalone broker that all other Actors can connect to. # This can also run inproc with the other actors. def main(): @@ -109,6 +124,5 @@ def main(): Info("BROKER", "KeyboardInterrupt. Stopping the broker.") broker.stop() - if __name__ == "__main__": main() From b5f8907646687743a679cf791081d45b46af49b1 Mon Sep 17 00:00:00 2001 From: Rajan Date: Tue, 26 Mar 2024 12:13:27 -0400 Subject: [PATCH 10/17] move socket creation to thread with recv --- .../apps/cap/py/autogencap/ActorConnector.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/samples/apps/cap/py/autogencap/ActorConnector.py b/samples/apps/cap/py/autogencap/ActorConnector.py index 27455d19e7c..3fa7cb7c9c3 100644 --- a/samples/apps/cap/py/autogencap/ActorConnector.py +++ b/samples/apps/cap/py/autogencap/ActorConnector.py @@ -22,30 +22,32 @@ def __init__(self, context, topic): self._resp_socket.setsockopt_string(zmq.SUBSCRIBE, f"{self._resp_topic}") self._topic = topic - self._pub_socket = self._context.socket(zmq.PUB) - self._pub_socket.setsockopt(zmq.LINGER, 0) - self._pub_socket.connect(xsub_url) - self._handshake() + self._connect_pub_socket() def _send_recv_router_msg(self): # Send a request to the router and wait for a response req_socket = self._context.socket(zmq.REQ) req_socket.connect(router_url) try: + Debug("ActorConnector","Broker Check Request Sent") req_socket.send_string("Request") _ = req_socket.recv_string() + Debug("ActorConnector","Broker Check Response Received") finally: req_socket.close() - def _handshake(self): - # Monitor handshake on the pub socket + def _connect_pub_socket(self): + self._pub_socket = self._context.socket(zmq.PUB) + self._pub_socket.setsockopt(zmq.LINGER, 0) monitor = self._pub_socket.get_monitor_socket() + self._pub_socket.connect(xsub_url) + # Monitor handshake on the pub socket while monitor.poll(): evt: Dict[str, Any] = {} mon_evt = recv_monitor_message(monitor) evt.update(mon_evt) - print(evt) if evt['event'] == zmq.EVENT_MONITOR_STOPPED or evt['event'] == zmq.EVENT_HANDSHAKE_SUCCEEDED: + Debug("ActorConnector","Handshake received (Or Monitor stopped)") break monitor.close() self._send_recv_router_msg() @@ -61,14 +63,11 @@ def send_bin_msg(self, msg_type: str, msg): ) def binary_request(self, msg_type: str, msg, retry=5): - time.sleep(0.5) # Let the network do things. self._pub_socket.send_multipart( [self._topic.encode("utf8"), msg_type.encode("utf8"), self._resp_topic.encode("utf8"), msg] ) - time.sleep(0.5) # Let the network do things. for i in range(retry + 1): try: - self._resp_socket.setsockopt(zmq.RCVTIMEO, 10000) resp_topic, resp_msg_type, resp_sender_topic, resp = self._resp_socket.recv_multipart() return resp_topic, resp_msg_type, resp_sender_topic, resp except zmq.Again: From ecf4fef62206460518ebe374c288744de7347882 Mon Sep 17 00:00:00 2001 From: Rajan Date: Tue, 26 Mar 2024 12:13:57 -0400 Subject: [PATCH 11/17] move socket creation to thread with recv --- samples/apps/cap/py/autogencap/Broker.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/samples/apps/cap/py/autogencap/Broker.py b/samples/apps/cap/py/autogencap/Broker.py index 35caf344f5b..a4d7b488615 100644 --- a/samples/apps/cap/py/autogencap/Broker.py +++ b/samples/apps/cap/py/autogencap/Broker.py @@ -12,7 +12,7 @@ def __init__(self, context: zmq.Context = zmq.Context()): self._xsub: zmq.Socket = None self._router: zmq.Socket = None - def start(self) -> bool: + def _init_sockets(self): try: # XPUB setup self._xpub = self._context.socket(zmq.XPUB) @@ -26,7 +26,7 @@ def start(self) -> bool: self._router = self._context.socket(zmq.ROUTER) self._router.setsockopt(zmq.LINGER, 0) self._router.bind(router_url) - + return True except zmq.ZMQError as e: Debug("BROKER", f"Unable to start. Check details: {e}") # If binding fails, close the sockets and return False @@ -37,6 +37,9 @@ def start(self) -> bool: if self._router: self._router.close() return False + + def start(self) -> bool: + Debug("BROKER", f"Trying to start broker.") self._run = True self._broker_thread: threading.Thread = threading.Thread(target=self.thread_fn) self._broker_thread.start() @@ -59,12 +62,18 @@ def stop(self): def thread_fn(self): try: + if not self._init_sockets(): + Debug("BROKER", "Receive thread not started since sockets were not initialized") + self._run = False + return + # Poll sockets for events self._poller: zmq.Poller = zmq.Poller() self._poller.register(self._xpub, zmq.POLLIN) self._poller.register(self._xsub, zmq.POLLIN) self._poller.register(self._router, zmq.POLLIN) + Info("BROKER", "Started. Waiting for events") # Receive msgs, forward and process while self._run: events = dict(self._poller.poll(500)) From cf18ebf3a901351bdd6edd842bc1c3143ba69075 Mon Sep 17 00:00:00 2001 From: Rajan Date: Tue, 26 Mar 2024 12:15:08 -0400 Subject: [PATCH 12/17] Better logging for DirectorySvc --- samples/apps/cap/py/autogencap/DebugLog.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/samples/apps/cap/py/autogencap/DebugLog.py b/samples/apps/cap/py/autogencap/DebugLog.py index b31512f7ffa..b86a497c658 100644 --- a/samples/apps/cap/py/autogencap/DebugLog.py +++ b/samples/apps/cap/py/autogencap/DebugLog.py @@ -1,6 +1,6 @@ import threading import datetime -from autogencap.Config import LOG_LEVEL, IGNORED_LOG_CONTEXTS +import autogencap.Config as Config from termcolor import colored # Define log levels as constants @@ -18,19 +18,21 @@ def Log(level, context, msg): # Check if the current level meets the threshold - if level >= LOG_LEVEL: # Use the LOG_LEVEL from the Config module + if level >= Config.LOG_LEVEL: # Use the LOG_LEVEL from the Config module # Check if the context is in the list of ignored contexts - if context in IGNORED_LOG_CONTEXTS: + if context in Config.IGNORED_LOG_CONTEXTS: return with console_lock: timestamp = colored(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S"), "dark_grey") # Translate level number to name and color level_name = colored(LEVEL_NAMES[level], LEVEL_COLOR[level]) - # Center justify the context and color it blue + # Left justify the context and color it blue context = colored(context.ljust(14), "blue") + # Left justify the threadid and color it blue + thread_id = colored(str(threading.get_ident()).ljust(5), "blue") # color the msg based on the level msg = colored(msg, LEVEL_COLOR[level]) - print(f"{threading.get_ident()} {timestamp} {level_name}: [{context}] {msg}") + print(f"{thread_id} {timestamp} {level_name}: [{context}] {msg}") def Debug(context, message): From 51d0f5cd28b53655526d72822927030e7e40e201 Mon Sep 17 00:00:00 2001 From: Rajan Date: Tue, 26 Mar 2024 12:16:10 -0400 Subject: [PATCH 13/17] better logging for directory svc --- samples/apps/cap/py/autogencap/DirectorySvc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/samples/apps/cap/py/autogencap/DirectorySvc.py b/samples/apps/cap/py/autogencap/DirectorySvc.py index c7f0ceabc2f..fb89c345e5e 100644 --- a/samples/apps/cap/py/autogencap/DirectorySvc.py +++ b/samples/apps/cap/py/autogencap/DirectorySvc.py @@ -76,7 +76,7 @@ def _actor_lookup_msg_handler(self, topic: str, msg_type: str, msg: bytes, sende if found_actor_list: for actor in found_actor_list: - Info("DirectorySvc", f"Actor found: {actor}") + Info("DirectorySvc", f"Actor found: {actor.name}") actor_lookup_resp.found = True actor_lookup_resp.actor.info_coll.extend(found_actor_list) else: @@ -93,6 +93,7 @@ def __init__(self, context: zmq.Context = zmq.Context()): self._directory_actor: DirectoryActor = None def _no_other_directory(self) -> bool: + Debug("DirectorySvc", "Pinging existing DirectorySvc") ping = Ping() serialized_msg = ping.SerializeToString() _, _, _, resp = self._directory_connector.binary_request(Ping.__name__, serialized_msg, retry=0) From bb0366973c10cc76c758d66c8c9c99fbccd7985f Mon Sep 17 00:00:00 2001 From: Rajan Date: Tue, 26 Mar 2024 12:16:37 -0400 Subject: [PATCH 14/17] Use logging config --- samples/apps/cap/py/demo/App.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/samples/apps/cap/py/demo/App.py b/samples/apps/cap/py/demo/App.py index 34b885c555d..a952a27c311 100644 --- a/samples/apps/cap/py/demo/App.py +++ b/samples/apps/cap/py/demo/App.py @@ -4,7 +4,7 @@ import argparse import _paths -from autogencap.Config import LOG_LEVEL, IGNORED_LOG_CONTEXTS +import autogencap.Config as Config import autogencap.DebugLog as DebugLog from SimpleActorDemo import simple_actor_demo from AGDemo import ag_demo @@ -27,7 +27,8 @@ def parse_args(): # Set the log level # Print log level string based on names in debug_log.py print(f"Log level: {DebugLog.LEVEL_NAMES[args.log_level]}") - # IGNORED_LOG_CONTEXTS.extend(["BROKER"]) + Config.LOG_LEVEL = args.log_level + # Config.IGNORED_LOG_CONTEXTS.extend(["BROKER"]) #################################################################################################### From bbdf193054088e06c7285ea8a4b9efd6883452c5 Mon Sep 17 00:00:00 2001 From: Rajan Date: Tue, 26 Mar 2024 12:16:59 -0400 Subject: [PATCH 15/17] Start removing sleeps --- samples/apps/cap/py/demo/SimpleActorDemo.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/samples/apps/cap/py/demo/SimpleActorDemo.py b/samples/apps/cap/py/demo/SimpleActorDemo.py index 26a67814d31..a7031a4bbe0 100644 --- a/samples/apps/cap/py/demo/SimpleActorDemo.py +++ b/samples/apps/cap/py/demo/SimpleActorDemo.py @@ -17,10 +17,8 @@ def simple_actor_demo(): network.connect() # Get a channel to the actor greeter_link = network.lookup_actor("Greeter") - time.sleep(1) # Send a message to the actor greeter_link.send_txt_msg("Hello World!") - time.sleep(1) # Cleanup greeter_link.close() network.disconnect() From 74fc690eddcbb4791a1364688520c7658f527a66 Mon Sep 17 00:00:00 2001 From: Rajan Date: Tue, 26 Mar 2024 12:20:49 -0400 Subject: [PATCH 16/17] pre-commit --- .../apps/cap/py/autogencap/ActorConnector.py | 9 ++++--- samples/apps/cap/py/autogencap/Broker.py | 22 +++++++++------- .../apps/cap/py/autogencap/DirectorySvc.py | 4 +++ samples/apps/cap/py/demo/zmq_tests.py | 25 ++++++++++++------- 4 files changed, 38 insertions(+), 22 deletions(-) diff --git a/samples/apps/cap/py/autogencap/ActorConnector.py b/samples/apps/cap/py/autogencap/ActorConnector.py index 3fa7cb7c9c3..4e0d4b92bfc 100644 --- a/samples/apps/cap/py/autogencap/ActorConnector.py +++ b/samples/apps/cap/py/autogencap/ActorConnector.py @@ -9,6 +9,7 @@ from .Config import xsub_url, xpub_url, router_url from typing import Any, Dict + class ActorConnector: def __init__(self, context, topic): self._context = context @@ -29,10 +30,10 @@ def _send_recv_router_msg(self): req_socket = self._context.socket(zmq.REQ) req_socket.connect(router_url) try: - Debug("ActorConnector","Broker Check Request Sent") + Debug("ActorConnector", "Broker Check Request Sent") req_socket.send_string("Request") _ = req_socket.recv_string() - Debug("ActorConnector","Broker Check Response Received") + Debug("ActorConnector", "Broker Check Response Received") finally: req_socket.close() @@ -46,8 +47,8 @@ def _connect_pub_socket(self): evt: Dict[str, Any] = {} mon_evt = recv_monitor_message(monitor) evt.update(mon_evt) - if evt['event'] == zmq.EVENT_MONITOR_STOPPED or evt['event'] == zmq.EVENT_HANDSHAKE_SUCCEEDED: - Debug("ActorConnector","Handshake received (Or Monitor stopped)") + if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED: + Debug("ActorConnector", "Handshake received (Or Monitor stopped)") break monitor.close() self._send_recv_router_msg() diff --git a/samples/apps/cap/py/autogencap/Broker.py b/samples/apps/cap/py/autogencap/Broker.py index a4d7b488615..8f276519335 100644 --- a/samples/apps/cap/py/autogencap/Broker.py +++ b/samples/apps/cap/py/autogencap/Broker.py @@ -4,6 +4,7 @@ from autogencap.DebugLog import Debug, Info, Warn from autogencap.Config import xsub_url, xpub_url, router_url + class Broker: def __init__(self, context: zmq.Context = zmq.Context()): self._context: zmq.Context = context @@ -37,9 +38,9 @@ def _init_sockets(self): if self._router: self._router.close() return False - + def start(self) -> bool: - Debug("BROKER", f"Trying to start broker.") + Debug("BROKER", "Trying to start broker.") self._run = True self._broker_thread: threading.Thread = threading.Thread(target=self.thread_fn) self._broker_thread.start() @@ -47,7 +48,8 @@ def start(self) -> bool: return True def stop(self): - if not self._run: return + if not self._run: + return # Error("BROKER_ERR", "fix cleanup self._context.term()") Debug("BROKER", "stopped") self._run = False @@ -64,9 +66,9 @@ def thread_fn(self): try: if not self._init_sockets(): Debug("BROKER", "Receive thread not started since sockets were not initialized") - self._run = False + self._run = False return - + # Poll sockets for events self._poller: zmq.Poller = zmq.Poller() self._poller.register(self._xpub, zmq.POLLIN) @@ -81,20 +83,20 @@ def thread_fn(self): message = self._xpub.recv_multipart() Debug("BROKER", f"subscription message: {message[0]}") self._xsub.send_multipart(message) - + if self._xsub in events: message = self._xsub.recv_multipart() Debug("BROKER", f"publishing message: {message}") self._xpub.send_multipart(message) - + if self._router in events: message = self._router.recv_multipart() Debug("BROKER", f"router message: {message}") # Mirror it back for now to confirm connectivity - # More interesting reserved point to point + # More interesting reserved point to point # routing coming in the the future self._router.send_multipart(message) - + except Exception as e: Debug("BROKER", f"thread encountered an error: {e}") finally: @@ -102,6 +104,7 @@ def thread_fn(self): Debug("BROKER", "thread ended") return + # Run a standalone broker that all other Actors can connect to. # This can also run inproc with the other actors. def main(): @@ -133,5 +136,6 @@ def main(): Info("BROKER", "KeyboardInterrupt. Stopping the broker.") broker.stop() + if __name__ == "__main__": main() diff --git a/samples/apps/cap/py/autogencap/DirectorySvc.py b/samples/apps/cap/py/autogencap/DirectorySvc.py index fb89c345e5e..87833c91ae3 100644 --- a/samples/apps/cap/py/autogencap/DirectorySvc.py +++ b/samples/apps/cap/py/autogencap/DirectorySvc.py @@ -21,6 +21,7 @@ # TODO (Future DirectorySv PR) use actor description, network_id, other properties to make directory # service more generic and powerful + class DirectoryActor(Actor): def __init__(self, topic: str, name: str): super().__init__(topic, name) @@ -86,6 +87,7 @@ def _actor_lookup_msg_handler(self, topic: str, msg_type: str, msg: bytes, sende serialized_msg = actor_lookup_resp.SerializeToString() sender_connection.send_bin_msg(ActorLookupResponse.__name__, serialized_msg) + class DirectorySvc: def __init__(self, context: zmq.Context = zmq.Context()): self._context: zmq.Context = context @@ -151,6 +153,7 @@ def lookup_actor_info_by_name(self, actor_name: str) -> ActorInfoCollection: return actor_lookup_resp.actor return None + # Run a standalone directory service def main(): context: zmq.Context = zmq.Context() @@ -192,5 +195,6 @@ def main(): context.term() Info("main", "Done.") + if __name__ == "__main__": main() diff --git a/samples/apps/cap/py/demo/zmq_tests.py b/samples/apps/cap/py/demo/zmq_tests.py index a03e36b2790..b53adb00c48 100644 --- a/samples/apps/cap/py/demo/zmq_tests.py +++ b/samples/apps/cap/py/demo/zmq_tests.py @@ -6,6 +6,7 @@ from zmq.utils.monitor import recv_monitor_message from autogencap.Config import xsub_url, xpub_url, router_url, dealer_url + def zmq_sub_test(): context = zmq.Context() sub_socket = context.socket(zmq.SUB) @@ -29,6 +30,7 @@ def zmq_sub_test(): print(f"No message received in {elapsed_time:.2f} seconds") sub_socket.close() + def event_monitor(pub_socket: zmq.Socket) -> None: monitor = pub_socket.get_monitor_socket() while monitor.poll(): @@ -36,11 +38,12 @@ def event_monitor(pub_socket: zmq.Socket) -> None: mon_evt = recv_monitor_message(monitor) evt.update(mon_evt) print(evt) - if evt['event'] == zmq.EVENT_MONITOR_STOPPED or evt['event'] == zmq.EVENT_HANDSHAKE_SUCCEEDED: + if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED: break monitor.close() -def zmq_pub_test(): + +def zmq_pub_test(): context = zmq.Context() pub_socket = context.socket(zmq.PUB) pub_socket.setsockopt(zmq.XPUB_VERBOSE, 1) @@ -52,6 +55,7 @@ def zmq_pub_test(): pub_socket.send_string(str(i)) pub_socket.close() + def zmq_router_dealer_test(): context = zmq.Context() router_socket = context.socket(zmq.ROUTER) @@ -85,13 +89,14 @@ def zmq_router_dealer_test(): message = dealer_socket.recv_multipart() print("BROKER", f"publishing message: {message[0]}") router_socket.send_multipart(message) - + except Exception as e: print("BROKER", f"thread encountered an error: {e}") finally: print("BROKER", "thread ended") return + def zmq_req_test(context: zmq.Context = None): if context is None: context = zmq.Context() @@ -106,6 +111,7 @@ def zmq_req_test(context: zmq.Context = None): finally: req_socket.close() + def zmq_rep_test(): context = zmq.Context() rep_socket = context.socket(zmq.REP) @@ -120,19 +126,20 @@ def zmq_rep_test(): finally: rep_socket.close() + if __name__ == "__main__": if len(sys.argv) > 1: - if sys.argv[1] == 'pub': + if sys.argv[1] == "pub": zmq_pub_test() - elif sys.argv[1] == 'sub': + elif sys.argv[1] == "sub": zmq_sub_test() - elif sys.argv[1] == 'router': + elif sys.argv[1] == "router": zmq_router_dealer_test() - elif sys.argv[1] == 'req': + elif sys.argv[1] == "req": zmq_req_test() - elif sys.argv[1] == 'rep': + elif sys.argv[1] == "rep": zmq_rep_test() else: print("Invalid argument. Please use 'pub', 'sub' 'router', 'req', 'rep'") else: - print("Please provide an argument. Please use 'pub', 'sub' 'router', 'req', 'rep'") \ No newline at end of file + print("Please provide an argument. Please use 'pub', 'sub' 'router', 'req', 'rep'") From c28fd9ef51a2808fafa64c8659028d7d3363bc83 Mon Sep 17 00:00:00 2001 From: Rajan Date: Tue, 26 Mar 2024 13:39:49 -0400 Subject: [PATCH 17/17] Cleanup monitor socket --- samples/apps/cap/py/autogencap/ActorConnector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/samples/apps/cap/py/autogencap/ActorConnector.py b/samples/apps/cap/py/autogencap/ActorConnector.py index 4e0d4b92bfc..cbd55a1da0a 100644 --- a/samples/apps/cap/py/autogencap/ActorConnector.py +++ b/samples/apps/cap/py/autogencap/ActorConnector.py @@ -50,6 +50,7 @@ def _connect_pub_socket(self): if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED: Debug("ActorConnector", "Handshake received (Or Monitor stopped)") break + self._pub_socket.disable_monitor() monitor.close() self._send_recv_router_msg()