From 43a476b6a716d2a469e137f5da68cd2e974b8510 Mon Sep 17 00:00:00 2001 From: anutosh491 Date: Fri, 16 Feb 2024 14:50:33 +0530 Subject: [PATCH 1/4] Added initial interface for the shell client --- include/xeus-zmq/xclient_zmq.hpp | 38 ++++++++++++++++ src/xclient_messenger.cpp | 44 +++++++++++++++++++ src/xclient_messenger.hpp | 31 +++++++++++++ src/xclient_zmq.cpp | 39 +++++++++++++++++ src/xclient_zmq_impl.cpp | 47 ++++++++++++++++++++ src/xclient_zmq_impl.hpp | 52 ++++++++++++++++++++++ src/xshell_client.cpp | 75 ++++++++++++++++++++++++++++++++ src/xshell_client.hpp | 60 +++++++++++++++++++++++++ 8 files changed, 386 insertions(+) create mode 100644 include/xeus-zmq/xclient_zmq.hpp create mode 100644 src/xclient_messenger.cpp create mode 100644 src/xclient_messenger.hpp create mode 100644 src/xclient_zmq.cpp create mode 100644 src/xclient_zmq_impl.cpp create mode 100644 src/xclient_zmq_impl.hpp create mode 100644 src/xshell_client.cpp create mode 100644 src/xshell_client.hpp diff --git a/include/xeus-zmq/xclient_zmq.hpp b/include/xeus-zmq/xclient_zmq.hpp new file mode 100644 index 0000000..b2d5e97 --- /dev/null +++ b/include/xeus-zmq/xclient_zmq.hpp @@ -0,0 +1,38 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#ifndef XEUS_CLIENT_ZMQ_HPP +#define XEUS_CLIENT_ZMQ_HPP + +#include + +#include "xeus-zmq.hpp" + +namespace xeus +{ + class xclient_zmq_impl; + + class XEUS_ZMQ_API xclient_zmq + { + public: + xclient_zmq::xclient_zmq(xcontext& context, + const std::string& user_name, + const xeus::xconfiguration& config); + ~xclient_zmq(); + + void send_shell(xmessage msg); + + nl::json check_shell_answer(); + + private: + std::unique_ptr p_client_impl; + }; +} + +#endif \ No newline at end of file diff --git a/src/xclient_messenger.cpp b/src/xclient_messenger.cpp new file mode 100644 index 0000000..ed9698e --- /dev/null +++ b/src/xclient_messenger.cpp @@ -0,0 +1,44 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include "nlohmann/json.hpp" +#include "xeus-zmq/xmiddleware.hpp" +#include "xclient_messenger.hpp" + +namespace nl = nlohmann; + +namespace xeus +{ + xclient_messenger::xclient_messenger(zmq::context_t& context) + : m_shell_controller(context, zmq::socket_type::req) + { + } + + xclient_messenger::~xclient_messenger() + { + } + + void xclient_messenger::connect() + { + // TODO + // m_shell_controller.set(zmq::sockopt::linger, get_socket_linger()); + // m_shell_controller.connect(get_controller_end_point("shell")); + } + + void xclient_messenger::stop_channels() + { + zmq::message_t stop_msg("stop", 4); + zmq::message_t response; + + // Wait for shell answer + m_shell_controller.send(stop_msg, zmq::send_flags::none); + (void)m_shell_controller.recv(response); + } +} + diff --git a/src/xclient_messenger.hpp b/src/xclient_messenger.hpp new file mode 100644 index 0000000..4244c34 --- /dev/null +++ b/src/xclient_messenger.hpp @@ -0,0 +1,31 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#ifndef XEUS_CLIENT_MESSENGER_HPP +#define XEUS_CLIENT_MESSENGER_HPP + +#include + +namespace xeus +{ + class xclient_messenger + { + public: + explicit xclient_messenger(zmq::context_t& context); + virtual ~xclient_messenger(); + + void connect(); + void stop_channels(); + + private: + + zmq::socket_t m_shell_controller; + // Add more controller sockets as needed for other channels + }; +} \ No newline at end of file diff --git a/src/xclient_zmq.cpp b/src/xclient_zmq.cpp new file mode 100644 index 0000000..485973d --- /dev/null +++ b/src/xclient_zmq.cpp @@ -0,0 +1,39 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include "xeus-zmq/xclient_zmq.hpp" +#include "xclient_zmq_impl.hpp" + +namespace xeus +{ + + xclient_zmq::xclient_zmq(xcontext& context, + const std::string& user_name, + const xeus::xconfiguration& config) + : p_client_impl(std::make_unique(context.get_wrapped_context(), user_name, config)) + { + } + + // Has to be in the cpp because incomplete + // types are used in unique_ptr in the header + xclient_zmq::~xclient_zmq() = default; + + + void xclient_zmq::send_shell(xmessage msg) + { + p_client_impl->send_shell(std::move(msg)); + } + + nl::json xclient_zmq::check_shell_answer() + { + // TODO + return nl::json::object(); + } + +} diff --git a/src/xclient_zmq_impl.cpp b/src/xclient_zmq_impl.cpp new file mode 100644 index 0000000..76b788b --- /dev/null +++ b/src/xclient_zmq_impl.cpp @@ -0,0 +1,47 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include "xclient_zmq_impl.hpp" + +namespace xeus +{ + + xclient_zmq_impl::xclient_zmq_impl(zmq::context_t& context, + const std::string& user_name, + const xeus::xconfiguration& config) + : m_shell_client(context, user_name, config) + , p_messenger(context) + { + } + + // Has to be in the cpp because incomplete + // types are used in unique_ptr in the header + xclient_zmq_impl::~xclient_zmq_impl() = default; + + void xclient_zmq_impl::connect_client_messenger() + { + p_messenger.connect(); + } + + xclient_messenger& xclient_zmq_impl::get_client_messenger() + { + return p_messenger; + } + + void xclient_zmq_impl::send_shell(xmessage msg) + { + m_shell_client.send_message(std::move(msg)); + } + + void xclient_zmq_impl::start_channels() + { + m_shell_client.start(); + } + +} \ No newline at end of file diff --git a/src/xclient_zmq_impl.hpp b/src/xclient_zmq_impl.hpp new file mode 100644 index 0000000..2360eda --- /dev/null +++ b/src/xclient_zmq_impl.hpp @@ -0,0 +1,52 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#ifndef XEUS_CLIENT_ZMQ_IMPL_HPP +#define XEUS_CLIENT_ZMQ_IMPL_HPP + +#include "xshell_client.hpp" +#include "xclient_messenger.hpp" + +#include + +namespace xeus +{ + class xclient_zmq_impl + { + public: + + virtual ~xclient_zmq_impl() = default; + + xclient_zmq_impl(const xclient_zmq_impl&) = delete; + xclient_zmq_impl& operator=(const xclient_zmq_impl&) = delete; + + xclient_zmq_impl(xclient_zmq_impl&&) = delete; + xclient_zmq_impl& operator=(xclient_zmq_impl&&) = delete; + + void connect_client_messenger(); + xclient_messenger& get_client_messenger(); + + void start_channels(); + + void send_shell(xmessage msg); + + protected: + xclient_zmq_impl(zmq::context_t& context, + const std::string& user_name, + const xeus::xconfiguration& config); + + private: + xclient_messenger p_messenger; + + xshell_client m_shell_client; + // Other channel clients can be added here + }; +} + +#endif \ No newline at end of file diff --git a/src/xshell_client.cpp b/src/xshell_client.cpp new file mode 100644 index 0000000..7e7653e --- /dev/null +++ b/src/xshell_client.cpp @@ -0,0 +1,75 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include "xshell_client.hpp" + +namespace xeus +{ + + xshell_client::xshell_client(zmq::context_t& context, + const std::string& user_name, + const xeus::xconfiguration& config) + : p_authentication(xeus::make_xauthentication(config.m_signature_scheme, config.m_key)) + , m_socket(context, zmq::socket_type::dealer) + , m_end_point("") + , m_user_name(user_name) + , m_session_id(xeus::new_xguid()) + { + // TODO + m_end_point = xeus::get_end_point(config.m_transport, config.m_ip, config.m_shell_port); + + m_socket.connect(m_end_point); + } + + xshell_client::~xshell_client() + { + m_socket.disconnect(m_end_point); + } + + void xshell_client::send_message(xmessage msg) + { + send_message_impl(std::move(msg), m_socket, *p_authentication); + } + + nl::json xshell_client::receive_message() + { + return receive_message_impl(m_socket, *p_authentication); + } + + nl::json xshell_client::check_received_message(long timeout) + { + // TODO + return nl::json::object(); + } + + zmq::socket& xshell_client::get_socket() + { + return m_socket; + } + + void start() + { + // TODO + } + + void send_message_impl(xmessage msg, + zmq::socket_t& socket, + const xeus::xauthentication& auth) + { + // TODO: Implement message sending + } + + nl::json xshell_client::receive_message_impl(zmq::socket_t& socket, + const xeus::xauthentication& auth) + { + // TODO: Implement message receiving + return nl::json::object(); + } + +} \ No newline at end of file diff --git a/src/xshell_client.hpp b/src/xshell_client.hpp new file mode 100644 index 0000000..0907840 --- /dev/null +++ b/src/xshell_client.hpp @@ -0,0 +1,60 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#ifndef XEUS_SHELL_CLIENT_HPP +#define XEUS_SHELL_CLIENT_HPP + +#include "zmq.hpp" +#include "nlohmann/json.hpp" +#include "xeus/xkernel_configuration.hpp" +#include "xeus-zmq/xauthentication.hpp" + +namespace xeus +{ + + class xshell_client + { + public: + + xshell_client(zmq::context_t& context, + const std::string& user_name, + const xeus::xconfiguration& config); + + ~xshell_client(); + + void send_message(xmessage msg); + + nl::json receive_message(); + + nl::json check_received_message(long timeout); + + zmq::socket& get_socket(); + + void start(); + + private: + void send_message_impl(xmessage msg, + zmq::socket_t& socket, + const xeus::xauthentication& auth); + + nl::json receive_message_impl(zmq::socket_t& socket, + const xeus::xauthentication& auth); + + using authentication_ptr = std::unique_ptr; + authentication_ptr p_authentication; + + zmq::socket_t m_socket; + + std::string m_end_point; + std::string m_user_name; + std::string m_session_id; + }; +} + +#endif \ No newline at end of file From 856a5da4035c3d35dbbffa79594a09aef475aa3f Mon Sep 17 00:00:00 2001 From: anutosh491 Date: Thu, 29 Feb 2024 13:51:49 +0530 Subject: [PATCH 2/4] Introducing the iopub client --- include/xeus-zmq/xclient_zmq.hpp | 25 ++- src/xclient_messenger.cpp | 10 +- src/xclient_messenger.hpp | 3 +- src/xclient_zmq.cpp | 61 ++++++- src/xclient_zmq_impl.cpp | 153 +++++++++++++++++- src/xclient_zmq_impl.hpp | 70 ++++++-- src/xdealer_channel.cpp | 51 ++++++ ...{xshell_client.hpp => xdealer_channel.hpp} | 33 +--- src/xiopub_client.cpp | 81 ++++++++++ src/xiopub_client.hpp | 50 ++++++ src/xshell_client.cpp | 75 --------- 11 files changed, 473 insertions(+), 139 deletions(-) create mode 100644 src/xdealer_channel.cpp rename src/{xshell_client.hpp => xdealer_channel.hpp} (51%) create mode 100644 src/xiopub_client.cpp create mode 100644 src/xiopub_client.hpp delete mode 100644 src/xshell_client.cpp diff --git a/include/xeus-zmq/xclient_zmq.hpp b/include/xeus-zmq/xclient_zmq.hpp index b2d5e97..38ca115 100644 --- a/include/xeus-zmq/xclient_zmq.hpp +++ b/include/xeus-zmq/xclient_zmq.hpp @@ -12,6 +12,10 @@ #include +#include "xeus/xeus_context.hpp" +#include "xeus/xkernel_configuration.hpp" +#include "xeus/xmessage.hpp" + #include "xeus-zmq.hpp" namespace xeus @@ -21,14 +25,29 @@ namespace xeus class XEUS_ZMQ_API xclient_zmq { public: + + using listener = std::function; + xclient_zmq::xclient_zmq(xcontext& context, - const std::string& user_name, const xeus::xconfiguration& config); ~xclient_zmq(); - void send_shell(xmessage msg); + void send_on_shell(xmessage msg); + void send_on_control(xmessage msg); + + std::optional check_shell_answer(); + std::optional check_control_answer(); + + void register_shell_listener(const listener& l); + void register_control_listener(const listener& l); + void register_iopub_listener(const listener& l); + + void notify_shell_listener(xmessage msg); + void notify_control_listener(xmessage msg); + void notify_iopub_listener(xmessage msg); - nl::json check_shell_answer(); + void start(); + std::optional pop_iopub_message(); private: std::unique_ptr p_client_impl; diff --git a/src/xclient_messenger.cpp b/src/xclient_messenger.cpp index ed9698e..e8749da 100644 --- a/src/xclient_messenger.cpp +++ b/src/xclient_messenger.cpp @@ -16,7 +16,6 @@ namespace nl = nlohmann; namespace xeus { xclient_messenger::xclient_messenger(zmq::context_t& context) - : m_shell_controller(context, zmq::socket_type::req) { } @@ -27,18 +26,11 @@ namespace xeus void xclient_messenger::connect() { // TODO - // m_shell_controller.set(zmq::sockopt::linger, get_socket_linger()); - // m_shell_controller.connect(get_controller_end_point("shell")); } void xclient_messenger::stop_channels() { - zmq::message_t stop_msg("stop", 4); - zmq::message_t response; - - // Wait for shell answer - m_shell_controller.send(stop_msg, zmq::send_flags::none); - (void)m_shell_controller.recv(response); + // TODO } } diff --git a/src/xclient_messenger.hpp b/src/xclient_messenger.hpp index 4244c34..6d55c97 100644 --- a/src/xclient_messenger.hpp +++ b/src/xclient_messenger.hpp @@ -25,7 +25,6 @@ namespace xeus private: - zmq::socket_t m_shell_controller; - // Add more controller sockets as needed for other channels + // Add some relevant sockets }; } \ No newline at end of file diff --git a/src/xclient_zmq.cpp b/src/xclient_zmq.cpp index 485973d..65c89c7 100644 --- a/src/xclient_zmq.cpp +++ b/src/xclient_zmq.cpp @@ -14,9 +14,8 @@ namespace xeus { xclient_zmq::xclient_zmq(xcontext& context, - const std::string& user_name, const xeus::xconfiguration& config) - : p_client_impl(std::make_unique(context.get_wrapped_context(), user_name, config)) + : p_client_impl(std::make_unique(context.get_wrapped_context(), config)) { } @@ -25,15 +24,63 @@ namespace xeus xclient_zmq::~xclient_zmq() = default; - void xclient_zmq::send_shell(xmessage msg) + void xclient_zmq::send_on_shell(xmessage msg) { - p_client_impl->send_shell(std::move(msg)); + p_client_impl->send_on_shell(std::move(msg)); } - nl::json xclient_zmq::check_shell_answer() + void xclient_zmq::send_on_control(xmessage msg) { - // TODO - return nl::json::object(); + p_client_impl->send_on_control(std::move(msg)); } + std::optional xclient_zmq::check_shell_answer() + { + return p_client_impl->receive_on_shell(-1); + } + + std::optional xclient_zmq::check_control_answer() + { + return p_client_impl->receive_on_control(-1); + } + + void xclient_zmq::register_shell_listener(const listener& l) + { + p_client_impl->register_shell_listener(l); + } + + void xclient_zmq::register_control_listener(const listener& l) + { + p_client_impl->register_control_listener(l); + } + + void xclient_zmq::register_iopub_listener(const listener& l) + { + p_client_impl->register_iopub_listener(l); + } + + void xclient_zmq::notify_shell_listener(xmessage msg) + { + p_client_impl->notify_shell_listener(std::move(msg)); + } + + void xclient_zmq::notify_control_listener(xmessage msg) + { + p_client_impl->notify_control_listener(std::move(msg)); + } + + void xclient_zmq::notify_iopub_listener(xmessage msg) + { + p_client_impl->notify_iopub_listener(std::move(msg)); + } + + void xclient_zmq::start() + { + p_client_impl->start(); + } + + std::optional xclient_zmq::pop_iopub_message() + { + p_client_impl->pop_iopub_message(); + } } diff --git a/src/xclient_zmq_impl.cpp b/src/xclient_zmq_impl.cpp index 76b788b..0e93c1d 100644 --- a/src/xclient_zmq_impl.cpp +++ b/src/xclient_zmq_impl.cpp @@ -7,16 +7,21 @@ * The full license is in the file LICENSE, distributed with this software. * ****************************************************************************/ +#include "xeus-zmq/xauthentication.hpp" #include "xclient_zmq_impl.hpp" +#include "xeus-zmq/xzmq_serializer.hpp" namespace xeus { xclient_zmq_impl::xclient_zmq_impl(zmq::context_t& context, - const std::string& user_name, const xeus::xconfiguration& config) - : m_shell_client(context, user_name, config) + : p_auth(make_xauthentication(config.m_signature_scheme, config.m_key)) + , m_shell_client(context, config) + , m_control_client(context, config) + , m_iopub_client(context, config) , p_messenger(context) + , m_error_handler(nl::json::error_handler_t::strict) { } @@ -24,9 +29,65 @@ namespace xeus // types are used in unique_ptr in the header xclient_zmq_impl::~xclient_zmq_impl() = default; - void xclient_zmq_impl::connect_client_messenger() + void xclient_zmq_impl::send_on_shell(xmessage msg) { - p_messenger.connect(); + zmq::multipart_t wire_msg = xzmq_serializer::serialize(std::move(msg), *p_auth, m_error_handler); + m_shell_client.send_message(wire_msg); + } + + void xclient_zmq_impl::send_on_control(xmessage msg) + { + zmq::multipart_t wire_msg = xzmq_serializer::serialize(std::move(msg), *p_auth, m_error_handler); + m_control_client.send_message(wire_msg); + } + + std::optional xclient_zmq_impl::receive_on_shell(long timeout) + { + std::optional wire_msg = m_shell_client.receive_message(timeout); + + if (wire_msg.has_value()) + { + return deserialize(wire_msg.value()); + } else { + return std::nullopt; + } + } + + std::optional xclient_zmq_impl::receive_on_control(long timeout) + { + std::optional wire_msg = m_control_client.receive_message(timeout); + + if (wire_msg.has_value()) + { + return deserialize(wire_msg.value()); + } else { + return std::nullopt; + } + } + + void xclient_zmq_impl::register_shell_listener(const listener& l) + { + m_shell_listener = l; + } + + void xclient_zmq_impl::register_control_listener(const listener& l) + { + m_control_listener = l; + } + + std::size_t xclient_zmq_impl::iopub_queue_size() const + { + return m_iopub_client.iopub_queue_size(); + } + + std::optional xclient_zmq_impl::pop_iopub_message(); + { + return m_iopub_client.pop_iopub_message(); + } + + void xclient_zmq_impl::register_iopub_listener(const listener& l) + { + m_iopub_listener = l; } xclient_messenger& xclient_zmq_impl::get_client_messenger() @@ -34,14 +95,90 @@ namespace xeus return p_messenger; } - void xclient_zmq_impl::send_shell(xmessage msg) + void xclient_zmq_impl::connect() + { + p_messenger.connect(); + } + + void xclient_zmq_impl::stop_channels() + { + p_messenger.stop_channels(); + } + + void xclient_zmq_impl::notify_shell_listener(xmessage msg) + { + m_shell_listener(std::move(msg)); + } + + void xclient_zmq_impl::notify_control_listener(xmessage msg) + { + m_control_listener(std::move(msg)); + } + + void xclient_zmq_impl::notify_iopub_listener(xmessage msg) + { + m_iopub_listener(std::move(msg)); + } + + void xclient_zmq_impl::poll(long timeout) + { + zmq::multipart_t wire_msg; + zmq::pollitem_t items[] + = { { m_shell_client.get_socket(), 0, ZMQ_POLLIN, 0 }, { m_control_client.get_socket(), 0, ZMQ_POLLIN, 0 } }; + + while (true) + { + zmq::poll(&items[0], 2, std::chrono::milliseconds(timeout)); + try + { + if (items[0].revents & ZMQ_POLLIN) + { + wire_msg.recv(m_shell_client.get_socket()); + xmessage msg = deserialize(wire_msg); + notify_shell_listener(std::move(msg)); + return; + } + if (items[1].revents & ZMQ_POLLIN) + { + wire_msg.recv(m_control_client.get_socket()); + xmessage msg = deserialize(wire_msg); + notify_control_listener(std::move(msg)); + return; + } + } + catch (std::exception& e) + { + std::cerr << e.what() << std::endl; + } + } + } + + void xclient_zmq_impl::wait_for_message() + { + std::optional pending_message = pop_iopub_message(); + + if (pending_message.has_value()) + { + notify_iopub_listener(*pending_message); + } else { + poll(-1); + } + } + + void xclient_zmq_impl::start() + { + start_iopub_thread(); + // TODO + } + + void xclient_zmq_impl::start_iopub_thread() { - m_shell_client.send_message(std::move(msg)); + m_iopub_thread = std::move(xthread(&xiopub_client::run, p_iopub_client.get())); } - void xclient_zmq_impl::start_channels() + xmessage xclient_zmq_impl::deserialize(zmq::multipart_t& wire_msg) const { - m_shell_client.start(); + return xzmq_serializer::deserialize(wire_msg, *p_auth); } } \ No newline at end of file diff --git a/src/xclient_zmq_impl.hpp b/src/xclient_zmq_impl.hpp index 2360eda..974bd1a 100644 --- a/src/xclient_zmq_impl.hpp +++ b/src/xclient_zmq_impl.hpp @@ -10,16 +10,27 @@ #ifndef XEUS_CLIENT_ZMQ_IMPL_HPP #define XEUS_CLIENT_ZMQ_IMPL_HPP -#include "xshell_client.hpp" -#include "xclient_messenger.hpp" - #include +#include "zmq.hpp" + +#include "xeus/xeus_context.hpp" +#include "xeus/xkernel_configuration.hpp" +#include "xeus/xmessage.hpp" + +#include "xeus-zmq/xthread.hpp" + +#include "xdealer_channel.hpp" +#include "xiopub_client.hpp" +#include "xclient_messenger.hpp" namespace xeus { + class xclient_zmq_impl { public: + using iopub_client_ptr = std::unique_ptr; + using listener = std::function; virtual ~xclient_zmq_impl() = default; @@ -29,23 +40,64 @@ namespace xeus xclient_zmq_impl(xclient_zmq_impl&&) = delete; xclient_zmq_impl& operator=(xclient_zmq_impl&&) = delete; - void connect_client_messenger(); + // shell channel + void send_on_shell(xmessage msg); + std::optional receive_on_shell(long timeout); + void register_shell_listener(const listener& l); + + // control channel + void send_on_control(xmessage msg); + std::optional receive_on_control(long timeout); + void register_control_listener(const listener& l); + + // iopub channel + std::size_t iopub_queue_size() const; + std::optional pop_iopub_message(); + void register_iopub_listener(const listener& l); + + // hearbeat channel + // TODO + + // client messenger xclient_messenger& get_client_messenger(); + void connect(); + void stop_channels(); + + void notify_shell_listener(xmessage msg); + void notify_control_listener(xmessage msg); + void notify_iopub_listener(xmessage msg); - void start_channels(); + void wait_for_message(); + void start(); - void send_shell(xmessage msg); + xmessage deserialize(zmq::multipart_t& wire_msg) const; protected: xclient_zmq_impl(zmq::context_t& context, - const std::string& user_name, const xeus::xconfiguration& config); private: + void start_iopub_thread(); + void poll(long timeout); + + using authentication_ptr = std::unique_ptr; + authentication_ptr p_auth; + xclient_messenger p_messenger; - xshell_client m_shell_client; - // Other channel clients can be added here + xdealer_channel m_shell_client; + xdealer_channel m_control_client; + xiopub_client m_iopub_client; + + nl::json::error_handler_t m_error_handler; + + listener m_shell_listener; + listener m_control_listener; + listener m_iopub_listener; + + iopub_client_ptr p_iopub_client; + + xthread m_iopub_thread; }; } diff --git a/src/xdealer_channel.cpp b/src/xdealer_channel.cpp new file mode 100644 index 0000000..8a17873 --- /dev/null +++ b/src/xdealer_channel.cpp @@ -0,0 +1,51 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include "xdealer_channel.hpp" + +namespace xeus +{ + + xdealer_channel::xdealer_channel(zmq::context_t& context, + const xeus::xconfiguration& config) + : m_socket(context, zmq::socket_type::dealer) + , m_end_point("") + { + m_end_point = xeus::get_end_point(config.m_transport, config.m_ip, config.m_shell_port); + + m_socket.connect(m_end_point); + } + + xdealer_channel::~xdealer_channel() + { + m_socket.disconnect(m_end_point); + } + + void xdealer_channel::send_message(zmq::multipart_t& message) + { + message.send(m_socket); + } + + std::optional xdealer_channel::receive_message(long timeout) + { + zmq::multipart_t wire_msg; + m_socket.setsockopt(ZMQ_RCVTIMEO, timeout); + if (wire_msg.recv(m_socket, zmq::recv_flags::none)) + { + return wire_msg; + } else { + return std::nullopt; + } + } + + zmq::socket& xdealer_channel::get_socket() + { + return m_socket; + } +} \ No newline at end of file diff --git a/src/xshell_client.hpp b/src/xdealer_channel.hpp similarity index 51% rename from src/xshell_client.hpp rename to src/xdealer_channel.hpp index 0907840..d3d30fc 100644 --- a/src/xshell_client.hpp +++ b/src/xdealer_channel.hpp @@ -7,53 +7,34 @@ * The full license is in the file LICENSE, distributed with this software. * ****************************************************************************/ -#ifndef XEUS_SHELL_CLIENT_HPP -#define XEUS_SHELL_CLIENT_HPP +#ifndef XEUS_DEALER_CHANNEL_HPP +#define XEUS_DEALER_CHANNEL_HPP #include "zmq.hpp" #include "nlohmann/json.hpp" #include "xeus/xkernel_configuration.hpp" -#include "xeus-zmq/xauthentication.hpp" namespace xeus { - class xshell_client + class xdealer_channel { public: - xshell_client(zmq::context_t& context, - const std::string& user_name, + xdealer_channel(zmq::context_t& context, const xeus::xconfiguration& config); - ~xshell_client(); + ~xdealer_channel(); - void send_message(xmessage msg); - - nl::json receive_message(); - - nl::json check_received_message(long timeout); + void send_message(zmq::multipart_t& message); + std::optional receive_message(long timeout); zmq::socket& get_socket(); - void start(); - private: - void send_message_impl(xmessage msg, - zmq::socket_t& socket, - const xeus::xauthentication& auth); - - nl::json receive_message_impl(zmq::socket_t& socket, - const xeus::xauthentication& auth); - - using authentication_ptr = std::unique_ptr; - authentication_ptr p_authentication; zmq::socket_t m_socket; - std::string m_end_point; - std::string m_user_name; - std::string m_session_id; }; } diff --git a/src/xiopub_client.cpp b/src/xiopub_client.cpp new file mode 100644 index 0000000..515dcef --- /dev/null +++ b/src/xiopub_client.cpp @@ -0,0 +1,81 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include "xiopub_client.hpp" + +#include "xeus-zmq/xzmq_serializer.hpp" + +namespace xeus +{ + + xiopub_client::xiopub_client(zmq::context_t& context, + const xeus::xconfiguration& config) + : m_iopub(context, zmq::socket_type::sub) + , m_iopub_end_point("") + { + m_iopub_end_point = xeus::get_end_point(config.m_transport, config.m_ip, config.m_iopub_port); + + m_iopub.connect(m_iopub_end_point); + } + + xiopub_client::~xiopub_client() + { + m_iopub.disconnect(m_iopub_end_point); + } + + std::size_t xiopub_client::iopub_queue_size() const + { + std::lock_guard guard(m_queue_mutex); + return m_message_queue.size(); + } + + std::optional xiopub_client::pop_iopub_message() + { + std::lock_guard guard(m_queue_mutex); + if (!m_message_queue.empty()) + { + xmessage msg = m_message_queue.back(); + m_message_queue.pop(); + return msg; + } else { + return std::nullopt; + } + } + + void xiopub_client::run() + { + zmq::pollitem_t items[] = { + { m_iopub, 0, ZMQ_POLLIN, 0 } + }; + + while (true) + { + zmq::poll(&items[0], 1, std::chrono::milliseconds(-1)); + + if (items[0].revents & ZMQ_POLLIN) + { + zmq::multipart_t wire_msg; + wire_msg.recv(m_iopub); + try + { + xmessage msg = p_client_impl->deserialize(wire_msg); + { + std::lock_guard guard(m_queue_mutex); + m_message_queue.push(msg); + } + p_client_impl->notify_shell_listener(std::move(msg)); + } + catch(std::exception& e) + { + std::cerr << e.what() << std::endl; + } + } + } + } +} \ No newline at end of file diff --git a/src/xiopub_client.hpp b/src/xiopub_client.hpp new file mode 100644 index 0000000..f3fe4a8 --- /dev/null +++ b/src/xiopub_client.hpp @@ -0,0 +1,50 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#ifndef XEUS_IOPUB_CLIENT_HPP +#define XEUS_IOPUB_CLIENT_HPP + +#include "zmq.hpp" +#include "nlohmann/json.hpp" + +#include "xeus/xeus_context.hpp" +#include "xeus/xkernel_configuration.hpp" + +#include "xeus-zmq/xthread.hpp" + +namespace xeus +{ + class xclient_zmq_impl; + + class xiopub_client + { + public: + + xiopub_client(zmq::context_t& context, + const xeus::xconfiguration& config); + + ~xiopub_client(); + + std::size_t iopub_queue_size() const; + std::optional pop_iopub_message(); + + void run(); + + private: + zmq::socket_t m_iopub; + std::string m_iopub_end_point; + + std::queue m_message_queue; + mutable std::mutex m_queue_mutex; + + xclient_zmq_impl* p_client_impl; + }; +} + +#endif \ No newline at end of file diff --git a/src/xshell_client.cpp b/src/xshell_client.cpp deleted file mode 100644 index 7e7653e..0000000 --- a/src/xshell_client.cpp +++ /dev/null @@ -1,75 +0,0 @@ -/*************************************************************************** -* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * -* Copyright (c) 2016, QuantStack * -* * -* Distributed under the terms of the BSD 3-Clause License. * -* * -* The full license is in the file LICENSE, distributed with this software. * -****************************************************************************/ - -#include "xshell_client.hpp" - -namespace xeus -{ - - xshell_client::xshell_client(zmq::context_t& context, - const std::string& user_name, - const xeus::xconfiguration& config) - : p_authentication(xeus::make_xauthentication(config.m_signature_scheme, config.m_key)) - , m_socket(context, zmq::socket_type::dealer) - , m_end_point("") - , m_user_name(user_name) - , m_session_id(xeus::new_xguid()) - { - // TODO - m_end_point = xeus::get_end_point(config.m_transport, config.m_ip, config.m_shell_port); - - m_socket.connect(m_end_point); - } - - xshell_client::~xshell_client() - { - m_socket.disconnect(m_end_point); - } - - void xshell_client::send_message(xmessage msg) - { - send_message_impl(std::move(msg), m_socket, *p_authentication); - } - - nl::json xshell_client::receive_message() - { - return receive_message_impl(m_socket, *p_authentication); - } - - nl::json xshell_client::check_received_message(long timeout) - { - // TODO - return nl::json::object(); - } - - zmq::socket& xshell_client::get_socket() - { - return m_socket; - } - - void start() - { - // TODO - } - - void send_message_impl(xmessage msg, - zmq::socket_t& socket, - const xeus::xauthentication& auth) - { - // TODO: Implement message sending - } - - nl::json xshell_client::receive_message_impl(zmq::socket_t& socket, - const xeus::xauthentication& auth) - { - // TODO: Implement message receiving - return nl::json::object(); - } - -} \ No newline at end of file From 5fbf1d4dc479466aa6f11553c38fd2348fc7d526 Mon Sep 17 00:00:00 2001 From: anutosh491 Date: Wed, 20 Mar 2024 13:06:45 +0530 Subject: [PATCH 3/4] Adding support for client messenger --- .github/workflows/main.yml | 4 +--- include/xeus-zmq/xclient_zmq.hpp | 10 ++++++++-- src/xclient_messenger.cpp | 14 ++++++++++---- src/xclient_messenger.hpp | 7 ++++--- src/xclient_zmq.cpp | 15 +++++++++++---- src/xclient_zmq_impl.cpp | 11 +++++++---- src/xclient_zmq_impl.hpp | 15 ++++++++------- src/xdealer_channel.cpp | 8 +++++--- src/xdealer_channel.hpp | 4 +++- src/xiopub_client.cpp | 7 +++++-- src/xiopub_client.hpp | 5 +++++ test/CMakeLists.txt | 10 +++++----- 12 files changed, 72 insertions(+), 38 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 34bd6dc..8740832 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -92,9 +92,7 @@ jobs: with: environment-file: environment-dev.yml environment-name: xeus-zmq - init-shell: >- - powershell - cmd.exe + init-shell: cmd.exe - name: Make build directory run: mkdir build diff --git a/include/xeus-zmq/xclient_zmq.hpp b/include/xeus-zmq/xclient_zmq.hpp index 38ca115..898a8b9 100644 --- a/include/xeus-zmq/xclient_zmq.hpp +++ b/include/xeus-zmq/xclient_zmq.hpp @@ -10,6 +10,8 @@ #ifndef XEUS_CLIENT_ZMQ_HPP #define XEUS_CLIENT_ZMQ_HPP +#include + #include #include "xeus/xeus_context.hpp" @@ -28,8 +30,7 @@ namespace xeus using listener = std::function; - xclient_zmq::xclient_zmq(xcontext& context, - const xeus::xconfiguration& config); + explicit xclient_zmq(std::unique_ptr impl); ~xclient_zmq(); void send_on_shell(xmessage msg); @@ -52,6 +53,11 @@ namespace xeus private: std::unique_ptr p_client_impl; }; + + XEUS_ZMQ_API + std::unique_ptr make_xclient_zmq(xcontext& context, + const xconfiguration& config, + nl::json::error_handler_t eh = nl::json::error_handler_t::strict); } #endif \ No newline at end of file diff --git a/src/xclient_messenger.cpp b/src/xclient_messenger.cpp index e8749da..bb2220f 100644 --- a/src/xclient_messenger.cpp +++ b/src/xclient_messenger.cpp @@ -16,6 +16,7 @@ namespace nl = nlohmann; namespace xeus { xclient_messenger::xclient_messenger(zmq::context_t& context) + : m_iopub_controller(context, zmq::socket_type::req) { } @@ -25,12 +26,17 @@ namespace xeus void xclient_messenger::connect() { - // TODO + m_iopub_controller.set(zmq::sockopt::linger, get_socket_linger()); + m_iopub_controller.connect(get_controller_end_point("iopub")); } void xclient_messenger::stop_channels() { - // TODO - } -} + zmq::message_t stop_msg("stop", 4); + zmq::message_t response; + // Wait for iopub answer + m_iopub_controller.send(stop_msg, zmq::send_flags::none); + (void)m_iopub_controller.recv(response); + } +} \ No newline at end of file diff --git a/src/xclient_messenger.hpp b/src/xclient_messenger.hpp index 6d55c97..7975436 100644 --- a/src/xclient_messenger.hpp +++ b/src/xclient_messenger.hpp @@ -24,7 +24,8 @@ namespace xeus void stop_channels(); private: - - // Add some relevant sockets + zmq::socket_t m_iopub_controller; }; -} \ No newline at end of file +} + +#endif \ No newline at end of file diff --git a/src/xclient_zmq.cpp b/src/xclient_zmq.cpp index 65c89c7..25a8829 100644 --- a/src/xclient_zmq.cpp +++ b/src/xclient_zmq.cpp @@ -13,9 +13,8 @@ namespace xeus { - xclient_zmq::xclient_zmq(xcontext& context, - const xeus::xconfiguration& config) - : p_client_impl(std::make_unique(context.get_wrapped_context(), config)) + xclient_zmq::xclient_zmq(std::unique_ptr impl) + : p_client_impl(std::move(impl)) { } @@ -81,6 +80,14 @@ namespace xeus std::optional xclient_zmq::pop_iopub_message() { - p_client_impl->pop_iopub_message(); + return p_client_impl->pop_iopub_message(); + } + + std::unique_ptr make_xclient_zmq(xcontext& context, + const xconfiguration& config, + nl::json::error_handler_t eh) + { + auto impl = std::make_unique(context.get_wrapped_context(), config, eh); + return std::make_unique(std::move(impl)); } } diff --git a/src/xclient_zmq_impl.cpp b/src/xclient_zmq_impl.cpp index 0e93c1d..4f682cc 100644 --- a/src/xclient_zmq_impl.cpp +++ b/src/xclient_zmq_impl.cpp @@ -7,6 +7,8 @@ * The full license is in the file LICENSE, distributed with this software. * ****************************************************************************/ +#include + #include "xeus-zmq/xauthentication.hpp" #include "xclient_zmq_impl.hpp" #include "xeus-zmq/xzmq_serializer.hpp" @@ -15,13 +17,14 @@ namespace xeus { xclient_zmq_impl::xclient_zmq_impl(zmq::context_t& context, - const xeus::xconfiguration& config) + const xeus::xconfiguration& config, + nl::json::error_handler_t eh) : p_auth(make_xauthentication(config.m_signature_scheme, config.m_key)) , m_shell_client(context, config) , m_control_client(context, config) , m_iopub_client(context, config) , p_messenger(context) - , m_error_handler(nl::json::error_handler_t::strict) + , m_error_handler(eh) { } @@ -80,7 +83,7 @@ namespace xeus return m_iopub_client.iopub_queue_size(); } - std::optional xclient_zmq_impl::pop_iopub_message(); + std::optional xclient_zmq_impl::pop_iopub_message() { return m_iopub_client.pop_iopub_message(); } @@ -159,7 +162,7 @@ namespace xeus if (pending_message.has_value()) { - notify_iopub_listener(*pending_message); + notify_iopub_listener(std::move(*pending_message)); } else { poll(-1); } diff --git a/src/xclient_zmq_impl.hpp b/src/xclient_zmq_impl.hpp index 974bd1a..c85f4d2 100644 --- a/src/xclient_zmq_impl.hpp +++ b/src/xclient_zmq_impl.hpp @@ -25,6 +25,7 @@ namespace xeus { + class xauthentication; class xclient_zmq_impl { @@ -32,7 +33,11 @@ namespace xeus using iopub_client_ptr = std::unique_ptr; using listener = std::function; - virtual ~xclient_zmq_impl() = default; + xclient_zmq_impl(zmq::context_t& context, + const xconfiguration& config, + nl::json::error_handler_t eh); + + ~xclient_zmq_impl(); xclient_zmq_impl(const xclient_zmq_impl&) = delete; xclient_zmq_impl& operator=(const xclient_zmq_impl&) = delete; @@ -72,10 +77,6 @@ namespace xeus xmessage deserialize(zmq::multipart_t& wire_msg) const; - protected: - xclient_zmq_impl(zmq::context_t& context, - const xeus::xconfiguration& config); - private: void start_iopub_thread(); void poll(long timeout); @@ -83,12 +84,12 @@ namespace xeus using authentication_ptr = std::unique_ptr; authentication_ptr p_auth; - xclient_messenger p_messenger; - xdealer_channel m_shell_client; xdealer_channel m_control_client; xiopub_client m_iopub_client; + xclient_messenger p_messenger; + nl::json::error_handler_t m_error_handler; listener m_shell_listener; diff --git a/src/xdealer_channel.cpp b/src/xdealer_channel.cpp index 8a17873..a3f96e7 100644 --- a/src/xdealer_channel.cpp +++ b/src/xdealer_channel.cpp @@ -7,6 +7,8 @@ * The full license is in the file LICENSE, distributed with this software. * ****************************************************************************/ +#include "xeus-zmq/xmiddleware.hpp" + #include "xdealer_channel.hpp" namespace xeus @@ -35,8 +37,8 @@ namespace xeus std::optional xdealer_channel::receive_message(long timeout) { zmq::multipart_t wire_msg; - m_socket.setsockopt(ZMQ_RCVTIMEO, timeout); - if (wire_msg.recv(m_socket, zmq::recv_flags::none)) + m_socket.set(zmq::sockopt::linger, timeout); + if (wire_msg.recv(m_socket)) { return wire_msg; } else { @@ -44,7 +46,7 @@ namespace xeus } } - zmq::socket& xdealer_channel::get_socket() + zmq::socket_t& xdealer_channel::get_socket() { return m_socket; } diff --git a/src/xdealer_channel.hpp b/src/xdealer_channel.hpp index d3d30fc..5204870 100644 --- a/src/xdealer_channel.hpp +++ b/src/xdealer_channel.hpp @@ -11,6 +11,8 @@ #define XEUS_DEALER_CHANNEL_HPP #include "zmq.hpp" +#include "zmq_addon.hpp" + #include "nlohmann/json.hpp" #include "xeus/xkernel_configuration.hpp" @@ -29,7 +31,7 @@ namespace xeus void send_message(zmq::multipart_t& message); std::optional receive_message(long timeout); - zmq::socket& get_socket(); + zmq::socket_t& get_socket(); private: diff --git a/src/xiopub_client.cpp b/src/xiopub_client.cpp index 515dcef..fb98662 100644 --- a/src/xiopub_client.cpp +++ b/src/xiopub_client.cpp @@ -7,7 +7,10 @@ * The full license is in the file LICENSE, distributed with this software. * ****************************************************************************/ +#include + #include "xiopub_client.hpp" +#include "xclient_zmq_impl.hpp" #include "xeus-zmq/xzmq_serializer.hpp" @@ -40,7 +43,7 @@ namespace xeus std::lock_guard guard(m_queue_mutex); if (!m_message_queue.empty()) { - xmessage msg = m_message_queue.back(); + xmessage msg = std::move(m_message_queue.back()); m_message_queue.pop(); return msg; } else { @@ -67,7 +70,7 @@ namespace xeus xmessage msg = p_client_impl->deserialize(wire_msg); { std::lock_guard guard(m_queue_mutex); - m_message_queue.push(msg); + m_message_queue.push(std::move(msg)); } p_client_impl->notify_shell_listener(std::move(msg)); } diff --git a/src/xiopub_client.hpp b/src/xiopub_client.hpp index f3fe4a8..d6c8349 100644 --- a/src/xiopub_client.hpp +++ b/src/xiopub_client.hpp @@ -10,13 +10,18 @@ #ifndef XEUS_IOPUB_CLIENT_HPP #define XEUS_IOPUB_CLIENT_HPP +#include +#include + #include "zmq.hpp" #include "nlohmann/json.hpp" +#include "xeus/xmessage.hpp" #include "xeus/xeus_context.hpp" #include "xeus/xkernel_configuration.hpp" #include "xeus-zmq/xthread.hpp" +#include "xeus-zmq/xmiddleware.hpp" namespace xeus { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 8ccb401..08e4531 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -88,7 +88,7 @@ configure_file( add_executable(test_kernel ${TEST_KERNEL_SOURCES}) target_link_libraries(test_kernel PRIVATE ${xeus-zmq_TARGET} Threads::Threads) -target_compile_features(test_kernel PRIVATE cxx_std_11) +target_compile_features(test_kernel PRIVATE cxx_std_17) set(CONNECTION_FILE ${CMAKE_CURRENT_SOURCE_DIR}/connection.json) @@ -118,7 +118,7 @@ configure_file( add_executable(test_kernel_control ${TEST_KERNEL_SPLIT_SOURCES}) target_link_libraries(test_kernel_control PRIVATE ${xeus-zmq_TARGET} Threads::Threads) -target_compile_features(test_kernel_control PRIVATE cxx_std_11) +target_compile_features(test_kernel_control PRIVATE cxx_std_17) set(CONNECTION_FILE ${CMAKE_CURRENT_SOURCE_DIR}/connection.json) @@ -148,7 +148,7 @@ configure_file( add_executable(test_kernel_shell ${TEST_KERNEL_SPLIT_SOURCES}) target_link_libraries(test_kernel_shell PRIVATE ${xeus-zmq_TARGET} Threads::Threads) -target_compile_features(test_kernel_shell PRIVATE cxx_std_11) +target_compile_features(test_kernel_shell PRIVATE cxx_std_17) set(CONNECTION_FILE ${CMAKE_CURRENT_SOURCE_DIR}/connection.json) @@ -175,7 +175,7 @@ if (UNIX) add_executable(test_kernel_ipc ${TEST_KERNEL_IPC_SOURCES}) target_link_libraries(test_kernel_ipc PRIVATE ${xeus-zmq_TARGET} Threads::Threads) - target_compile_features(test_kernel_ipc PRIVATE cxx_std_11) + target_compile_features(test_kernel_ipc PRIVATE cxx_std_17) # Test_client_ipc # =============== @@ -185,6 +185,6 @@ if (UNIX) add_executable(test_client_ipc ${TEST_CLIENT_IPC_SOURCES}) target_link_libraries(test_client_ipc PRIVATE ${xeus-zmq_TARGET} Threads::Threads) - target_compile_features(test_client_ipc PRIVATE cxx_std_11) + target_compile_features(test_client_ipc PRIVATE cxx_std_17) endif (UNIX) From 157a987625fa6721fc87ed0a319ec44ab0ee1dd6 Mon Sep 17 00:00:00 2001 From: anutosh491 Date: Thu, 21 Mar 2024 13:38:55 +0530 Subject: [PATCH 4/4] Fixing linking errors due to nlohmann_json and addressing other minor changes --- CMakeLists.txt | 12 +++++++++++- README.md | 2 +- environment-dev.yml | 2 +- include/xeus-zmq/xclient_zmq.hpp | 6 +++++- src/xclient_zmq.cpp | 24 ++++++++++++++++++++++-- src/xclient_zmq_impl.cpp | 11 +++-------- src/xclient_zmq_impl.hpp | 1 - src/xdealer_channel.cpp | 12 +++++------- src/xdealer_channel.hpp | 6 +++--- src/xiopub_client.cpp | 9 ++++----- src/xiopub_client.hpp | 2 +- xeus-zmqConfig.cmake.in | 2 +- 12 files changed, 57 insertions(+), 32 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e93b90c..be7c81b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,7 +80,7 @@ message(STATUS "XEUS_ZMQ_BUILD_TESTS: ${XEUS_ZMQ_BUILD_TESTS}") # ============ set(xeus_REQUIRED_VERSION 3.2.0) -set(nlohmann_json_REQUIRED_VERSION 3.2.0) +set(nlohmann_json_REQUIRED_VERSION 3.11.2) set(cppzmq_REQUIRED_VERSION 4.8.1) set(zeromq_REQUIRED_VERSION 4.3.2) @@ -132,6 +132,7 @@ set(XEUS_ZMQ_HEADERS ${XEUS_ZMQ_INCLUDE_DIR}/xeus-zmq/xthread.hpp ${XEUS_ZMQ_INCLUDE_DIR}/xeus-zmq/xzmq_context.hpp ${XEUS_ZMQ_INCLUDE_DIR}/xeus-zmq/xzmq_serializer.hpp + ${XEUS_ZMQ_INCLUDE_DIR}/xeus-zmq/xclient_zmq.hpp ) set(XEUS_ZMQ_SOURCES @@ -164,6 +165,15 @@ set(XEUS_ZMQ_SOURCES ${XEUS_ZMQ_SOURCE_DIR}/xzmq_messenger.hpp ${XEUS_ZMQ_SOURCE_DIR}/xzmq_messenger.cpp ${XEUS_ZMQ_SOURCE_DIR}/xzmq_serializer.cpp + ${XEUS_ZMQ_SOURCE_DIR}/xclient_zmq.cpp + ${XEUS_ZMQ_SOURCE_DIR}/xclient_messenger.hpp + ${XEUS_ZMQ_SOURCE_DIR}/xclient_messenger.cpp + ${XEUS_ZMQ_SOURCE_DIR}/xclient_zmq_impl.hpp + ${XEUS_ZMQ_SOURCE_DIR}/xclient_zmq_impl.cpp + ${XEUS_ZMQ_SOURCE_DIR}/xdealer_channel.hpp + ${XEUS_ZMQ_SOURCE_DIR}/xdealer_channel.cpp + ${XEUS_ZMQ_SOURCE_DIR}/xiopub_client.hpp + ${XEUS_ZMQ_SOURCE_DIR}/xiopub_client.cpp ) # Targets and link diff --git a/README.md b/README.md index b3cf513..15ad4a5 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ See the [documentation](http://xeus.readthedocs.io/) for an exhaustive list of t We have packaged all these dependencies on conda-forge. The simplest way to install them is to run: ```bash -mamba install cmake pkg-config zeromq cppzmq OpenSSL nlohmann_json xtl xeus -c conda-forge +mamba install cmake pkg-config zeromq cppzmq OpenSSL nlohmann_json=3.11.2 xtl xeus -c conda-forge ``` Once you have installed the dependencies, you can build and install `xeus-zmq`: diff --git a/environment-dev.yml b/environment-dev.yml index 77576c1..77d5403 100644 --- a/environment-dev.yml +++ b/environment-dev.yml @@ -15,7 +15,7 @@ dependencies: - xeus>=3.2.0,<4.0 - OpenSSL=1 - libopenssl-static=1 - - nlohmann_json + - nlohmann_json=3.11.2 # Test dependencies - doctest >= 2.4.6 - pytest diff --git a/include/xeus-zmq/xclient_zmq.hpp b/include/xeus-zmq/xclient_zmq.hpp index 898a8b9..9eed125 100644 --- a/include/xeus-zmq/xclient_zmq.hpp +++ b/include/xeus-zmq/xclient_zmq.hpp @@ -47,8 +47,12 @@ namespace xeus void notify_control_listener(xmessage msg); void notify_iopub_listener(xmessage msg); - void start(); + std::size_t iopub_queue_size() const; std::optional pop_iopub_message(); + void connect(); + void stop_channels(); + void start(); + void wait_for_message(); private: std::unique_ptr p_client_impl; diff --git a/src/xclient_zmq.cpp b/src/xclient_zmq.cpp index 25a8829..ad9dc54 100644 --- a/src/xclient_zmq.cpp +++ b/src/xclient_zmq.cpp @@ -73,9 +73,9 @@ namespace xeus p_client_impl->notify_iopub_listener(std::move(msg)); } - void xclient_zmq::start() + std::size_t xclient_zmq::iopub_queue_size() const { - p_client_impl->start(); + return p_client_impl->iopub_queue_size(); } std::optional xclient_zmq::pop_iopub_message() @@ -83,6 +83,26 @@ namespace xeus return p_client_impl->pop_iopub_message(); } + void xclient_zmq::connect() + { + p_client_impl->connect(); + } + + void xclient_zmq::stop_channels() + { + p_client_impl->stop_channels(); + } + + void xclient_zmq::start() + { + p_client_impl->start(); + } + + void xclient_zmq::wait_for_message() + { + p_client_impl->wait_for_message(); + } + std::unique_ptr make_xclient_zmq(xcontext& context, const xconfiguration& config, nl::json::error_handler_t eh) diff --git a/src/xclient_zmq_impl.cpp b/src/xclient_zmq_impl.cpp index 4f682cc..699ade0 100644 --- a/src/xclient_zmq_impl.cpp +++ b/src/xclient_zmq_impl.cpp @@ -20,8 +20,8 @@ namespace xeus const xeus::xconfiguration& config, nl::json::error_handler_t eh) : p_auth(make_xauthentication(config.m_signature_scheme, config.m_key)) - , m_shell_client(context, config) - , m_control_client(context, config) + , m_shell_client(context, config.m_transport, config.m_ip, config.m_shell_port) + , m_control_client(context, config.m_transport, config.m_ip, config.m_control_port) , m_iopub_client(context, config) , p_messenger(context) , m_error_handler(eh) @@ -93,11 +93,6 @@ namespace xeus m_iopub_listener = l; } - xclient_messenger& xclient_zmq_impl::get_client_messenger() - { - return p_messenger; - } - void xclient_zmq_impl::connect() { p_messenger.connect(); @@ -171,7 +166,7 @@ namespace xeus void xclient_zmq_impl::start() { start_iopub_thread(); - // TODO + // TODO : Introduce a client, xheartbeat_client that runs on its own thread, m_heartbeat_thread. } void xclient_zmq_impl::start_iopub_thread() diff --git a/src/xclient_zmq_impl.hpp b/src/xclient_zmq_impl.hpp index c85f4d2..30cfdcb 100644 --- a/src/xclient_zmq_impl.hpp +++ b/src/xclient_zmq_impl.hpp @@ -64,7 +64,6 @@ namespace xeus // TODO // client messenger - xclient_messenger& get_client_messenger(); void connect(); void stop_channels(); diff --git a/src/xdealer_channel.cpp b/src/xdealer_channel.cpp index a3f96e7..81c986c 100644 --- a/src/xdealer_channel.cpp +++ b/src/xdealer_channel.cpp @@ -15,18 +15,16 @@ namespace xeus { xdealer_channel::xdealer_channel(zmq::context_t& context, - const xeus::xconfiguration& config) + const std::string& transport, + const std::string& ip, + const std::string& port) : m_socket(context, zmq::socket_type::dealer) - , m_end_point("") { - m_end_point = xeus::get_end_point(config.m_transport, config.m_ip, config.m_shell_port); - - m_socket.connect(m_end_point); + m_socket.connect(get_end_point(transport, ip, port)); } xdealer_channel::~xdealer_channel() { - m_socket.disconnect(m_end_point); } void xdealer_channel::send_message(zmq::multipart_t& message) @@ -37,7 +35,7 @@ namespace xeus std::optional xdealer_channel::receive_message(long timeout) { zmq::multipart_t wire_msg; - m_socket.set(zmq::sockopt::linger, timeout); + m_socket.set(zmq::sockopt::linger, static_cast(timeout)); if (wire_msg.recv(m_socket)) { return wire_msg; diff --git a/src/xdealer_channel.hpp b/src/xdealer_channel.hpp index 5204870..4893dc2 100644 --- a/src/xdealer_channel.hpp +++ b/src/xdealer_channel.hpp @@ -24,7 +24,9 @@ namespace xeus public: xdealer_channel(zmq::context_t& context, - const xeus::xconfiguration& config); + const std::string& transport, + const std::string& ip, + const std::string& port); ~xdealer_channel(); @@ -34,9 +36,7 @@ namespace xeus zmq::socket_t& get_socket(); private: - zmq::socket_t m_socket; - std::string m_end_point; }; } diff --git a/src/xiopub_client.cpp b/src/xiopub_client.cpp index fb98662..3519766 100644 --- a/src/xiopub_client.cpp +++ b/src/xiopub_client.cpp @@ -13,6 +13,7 @@ #include "xclient_zmq_impl.hpp" #include "xeus-zmq/xzmq_serializer.hpp" +#include "xeus-zmq/xmiddleware.hpp" namespace xeus { @@ -20,16 +21,14 @@ namespace xeus xiopub_client::xiopub_client(zmq::context_t& context, const xeus::xconfiguration& config) : m_iopub(context, zmq::socket_type::sub) - , m_iopub_end_point("") + , m_controller(context, zmq::socket_type::rep) { - m_iopub_end_point = xeus::get_end_point(config.m_transport, config.m_ip, config.m_iopub_port); - - m_iopub.connect(m_iopub_end_point); + m_iopub.connect(get_end_point(config.m_transport, config.m_ip, config.m_iopub_port)); + init_socket(m_controller, get_controller_end_point("iopub")); } xiopub_client::~xiopub_client() { - m_iopub.disconnect(m_iopub_end_point); } std::size_t xiopub_client::iopub_queue_size() const diff --git a/src/xiopub_client.hpp b/src/xiopub_client.hpp index d6c8349..f374e76 100644 --- a/src/xiopub_client.hpp +++ b/src/xiopub_client.hpp @@ -43,7 +43,7 @@ namespace xeus private: zmq::socket_t m_iopub; - std::string m_iopub_end_point; + zmq::socket_t m_controller; std::queue m_message_queue; mutable std::mutex m_queue_mutex; diff --git a/xeus-zmqConfig.cmake.in b/xeus-zmqConfig.cmake.in index 3f8761d..66bc431 100644 --- a/xeus-zmqConfig.cmake.in +++ b/xeus-zmqConfig.cmake.in @@ -22,7 +22,7 @@ set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR};${CMAKE_MODULE_PATH}") @XEUS_ZMQ_CONFIG_CODE@ include(CMakeFindDependencyMacro) -find_dependency(nlohmann_json @nlohmann_json_REQUIRED_VERSION@) +find_dependency(nlohmann_json @nlohmann_json_VERSION@ EXACT) find_dependency(xeus @xeus_REQUIRED_VERSION@) # On Unix platforms, ZeroMQ is built with autotools and pkg-config is