Skip to content

Commit

Permalink
Adding support for client messenger
Browse files Browse the repository at this point in the history
  • Loading branch information
anutosh491 committed Mar 26, 2024
1 parent 856a5da commit 5fbf1d4
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 38 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ jobs:
with:
environment-file: environment-dev.yml
environment-name: xeus-zmq
init-shell: >-
powershell
cmd.exe
init-shell: cmd.exe

- name: Make build directory
run: mkdir build
Expand Down
10 changes: 8 additions & 2 deletions include/xeus-zmq/xclient_zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#ifndef XEUS_CLIENT_ZMQ_HPP
#define XEUS_CLIENT_ZMQ_HPP

#include <optional>

#include <nlohmann/json.hpp>

#include "xeus/xeus_context.hpp"
Expand All @@ -28,8 +30,7 @@ namespace xeus

using listener = std::function<void(xmessage)>;

xclient_zmq::xclient_zmq(xcontext& context,
const xeus::xconfiguration& config);
explicit xclient_zmq(std::unique_ptr<xclient_zmq_impl> impl);
~xclient_zmq();

void send_on_shell(xmessage msg);
Expand All @@ -52,6 +53,11 @@ namespace xeus
private:
std::unique_ptr<xclient_zmq_impl> p_client_impl;
};

XEUS_ZMQ_API
std::unique_ptr<xclient_zmq> make_xclient_zmq(xcontext& context,
const xconfiguration& config,
nl::json::error_handler_t eh = nl::json::error_handler_t::strict);
}

#endif
14 changes: 10 additions & 4 deletions src/xclient_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace nl = nlohmann;
namespace xeus
{
xclient_messenger::xclient_messenger(zmq::context_t& context)
: m_iopub_controller(context, zmq::socket_type::req)
{
}

Expand All @@ -25,12 +26,17 @@ namespace xeus

void xclient_messenger::connect()
{
// TODO
m_iopub_controller.set(zmq::sockopt::linger, get_socket_linger());
m_iopub_controller.connect(get_controller_end_point("iopub"));
}

void xclient_messenger::stop_channels()
{
// TODO
}
}
zmq::message_t stop_msg("stop", 4);
zmq::message_t response;

// Wait for iopub answer
m_iopub_controller.send(stop_msg, zmq::send_flags::none);
(void)m_iopub_controller.recv(response);
}
}
7 changes: 4 additions & 3 deletions src/xclient_messenger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ namespace xeus
void stop_channels();

private:

// Add some relevant sockets
zmq::socket_t m_iopub_controller;
};
}
}

#endif
15 changes: 11 additions & 4 deletions src/xclient_zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
namespace xeus
{

xclient_zmq::xclient_zmq(xcontext& context,
const xeus::xconfiguration& config)
: p_client_impl(std::make_unique<xclient_zmq_impl>(context.get_wrapped_context<zmq::context_t>(), config))
xclient_zmq::xclient_zmq(std::unique_ptr<xclient_zmq_impl> impl)
: p_client_impl(std::move(impl))
{
}

Expand Down Expand Up @@ -81,6 +80,14 @@ namespace xeus

std::optional<xmessage> xclient_zmq::pop_iopub_message()
{
p_client_impl->pop_iopub_message();
return p_client_impl->pop_iopub_message();
}

std::unique_ptr<xclient_zmq> make_xclient_zmq(xcontext& context,
const xconfiguration& config,
nl::json::error_handler_t eh)
{
auto impl = std::make_unique<xclient_zmq_impl>(context.get_wrapped_context<zmq::context_t>(), config, eh);
return std::make_unique<xclient_zmq>(std::move(impl));
}
}
11 changes: 7 additions & 4 deletions src/xclient_zmq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#include <iostream>

#include "xeus-zmq/xauthentication.hpp"
#include "xclient_zmq_impl.hpp"
#include "xeus-zmq/xzmq_serializer.hpp"
Expand All @@ -15,13 +17,14 @@ namespace xeus
{

xclient_zmq_impl::xclient_zmq_impl(zmq::context_t& context,
const xeus::xconfiguration& config)
const xeus::xconfiguration& config,
nl::json::error_handler_t eh)
: p_auth(make_xauthentication(config.m_signature_scheme, config.m_key))
, m_shell_client(context, config)
, m_control_client(context, config)
, m_iopub_client(context, config)
, p_messenger(context)
, m_error_handler(nl::json::error_handler_t::strict)
, m_error_handler(eh)
{
}

Expand Down Expand Up @@ -80,7 +83,7 @@ namespace xeus
return m_iopub_client.iopub_queue_size();
}

std::optional<xmessage> xclient_zmq_impl::pop_iopub_message();
std::optional<xmessage> xclient_zmq_impl::pop_iopub_message()
{
return m_iopub_client.pop_iopub_message();
}
Expand Down Expand Up @@ -159,7 +162,7 @@ namespace xeus

if (pending_message.has_value())
{
notify_iopub_listener(*pending_message);
notify_iopub_listener(std::move(*pending_message));
} else {
poll(-1);
}
Expand Down
15 changes: 8 additions & 7 deletions src/xclient_zmq_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@

namespace xeus
{
class xauthentication;

class xclient_zmq_impl
{
public:
using iopub_client_ptr = std::unique_ptr<xiopub_client>;
using listener = std::function<void(xmessage)>;

virtual ~xclient_zmq_impl() = default;
xclient_zmq_impl(zmq::context_t& context,
const xconfiguration& config,
nl::json::error_handler_t eh);

~xclient_zmq_impl();

xclient_zmq_impl(const xclient_zmq_impl&) = delete;
xclient_zmq_impl& operator=(const xclient_zmq_impl&) = delete;
Expand Down Expand Up @@ -72,23 +77,19 @@ namespace xeus

xmessage deserialize(zmq::multipart_t& wire_msg) const;

protected:
xclient_zmq_impl(zmq::context_t& context,
const xeus::xconfiguration& config);

private:
void start_iopub_thread();
void poll(long timeout);

using authentication_ptr = std::unique_ptr<xauthentication>;
authentication_ptr p_auth;

xclient_messenger p_messenger;

xdealer_channel m_shell_client;
xdealer_channel m_control_client;
xiopub_client m_iopub_client;

xclient_messenger p_messenger;

nl::json::error_handler_t m_error_handler;

listener m_shell_listener;
Expand Down
8 changes: 5 additions & 3 deletions src/xdealer_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#include "xeus-zmq/xmiddleware.hpp"

#include "xdealer_channel.hpp"

namespace xeus
Expand Down Expand Up @@ -35,16 +37,16 @@ namespace xeus
std::optional<zmq::multipart_t> xdealer_channel::receive_message(long timeout)
{
zmq::multipart_t wire_msg;
m_socket.setsockopt(ZMQ_RCVTIMEO, timeout);
if (wire_msg.recv(m_socket, zmq::recv_flags::none))
m_socket.set(zmq::sockopt::linger, timeout);
if (wire_msg.recv(m_socket))
{
return wire_msg;
} else {
return std::nullopt;
}
}

zmq::socket& xdealer_channel::get_socket()
zmq::socket_t& xdealer_channel::get_socket()
{
return m_socket;
}
Expand Down
4 changes: 3 additions & 1 deletion src/xdealer_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#define XEUS_DEALER_CHANNEL_HPP

#include "zmq.hpp"
#include "zmq_addon.hpp"

#include "nlohmann/json.hpp"
#include "xeus/xkernel_configuration.hpp"

Expand All @@ -29,7 +31,7 @@ namespace xeus
void send_message(zmq::multipart_t& message);
std::optional<zmq::multipart_t> receive_message(long timeout);

zmq::socket& get_socket();
zmq::socket_t& get_socket();

private:

Expand Down
7 changes: 5 additions & 2 deletions src/xiopub_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#include <iostream>

#include "xiopub_client.hpp"
#include "xclient_zmq_impl.hpp"

#include "xeus-zmq/xzmq_serializer.hpp"

Expand Down Expand Up @@ -40,7 +43,7 @@ namespace xeus
std::lock_guard<std::mutex> guard(m_queue_mutex);
if (!m_message_queue.empty())
{
xmessage msg = m_message_queue.back();
xmessage msg = std::move(m_message_queue.back());
m_message_queue.pop();
return msg;
} else {
Expand All @@ -67,7 +70,7 @@ namespace xeus
xmessage msg = p_client_impl->deserialize(wire_msg);
{
std::lock_guard<std::mutex> guard(m_queue_mutex);
m_message_queue.push(msg);
m_message_queue.push(std::move(msg));
}
p_client_impl->notify_shell_listener(std::move(msg));
}
Expand Down
5 changes: 5 additions & 0 deletions src/xiopub_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@
#ifndef XEUS_IOPUB_CLIENT_HPP
#define XEUS_IOPUB_CLIENT_HPP

#include <queue>
#include <mutex>

#include "zmq.hpp"
#include "nlohmann/json.hpp"

#include "xeus/xmessage.hpp"
#include "xeus/xeus_context.hpp"
#include "xeus/xkernel_configuration.hpp"

#include "xeus-zmq/xthread.hpp"
#include "xeus-zmq/xmiddleware.hpp"

namespace xeus
{
Expand Down
10 changes: 5 additions & 5 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ configure_file(

add_executable(test_kernel ${TEST_KERNEL_SOURCES})
target_link_libraries(test_kernel PRIVATE ${xeus-zmq_TARGET} Threads::Threads)
target_compile_features(test_kernel PRIVATE cxx_std_11)
target_compile_features(test_kernel PRIVATE cxx_std_17)

set(CONNECTION_FILE ${CMAKE_CURRENT_SOURCE_DIR}/connection.json)

Expand Down Expand Up @@ -118,7 +118,7 @@ configure_file(

add_executable(test_kernel_control ${TEST_KERNEL_SPLIT_SOURCES})
target_link_libraries(test_kernel_control PRIVATE ${xeus-zmq_TARGET} Threads::Threads)
target_compile_features(test_kernel_control PRIVATE cxx_std_11)
target_compile_features(test_kernel_control PRIVATE cxx_std_17)

set(CONNECTION_FILE ${CMAKE_CURRENT_SOURCE_DIR}/connection.json)

Expand Down Expand Up @@ -148,7 +148,7 @@ configure_file(

add_executable(test_kernel_shell ${TEST_KERNEL_SPLIT_SOURCES})
target_link_libraries(test_kernel_shell PRIVATE ${xeus-zmq_TARGET} Threads::Threads)
target_compile_features(test_kernel_shell PRIVATE cxx_std_11)
target_compile_features(test_kernel_shell PRIVATE cxx_std_17)

set(CONNECTION_FILE ${CMAKE_CURRENT_SOURCE_DIR}/connection.json)

Expand All @@ -175,7 +175,7 @@ if (UNIX)

add_executable(test_kernel_ipc ${TEST_KERNEL_IPC_SOURCES})
target_link_libraries(test_kernel_ipc PRIVATE ${xeus-zmq_TARGET} Threads::Threads)
target_compile_features(test_kernel_ipc PRIVATE cxx_std_11)
target_compile_features(test_kernel_ipc PRIVATE cxx_std_17)

# Test_client_ipc
# ===============
Expand All @@ -185,6 +185,6 @@ if (UNIX)

add_executable(test_client_ipc ${TEST_CLIENT_IPC_SOURCES})
target_link_libraries(test_client_ipc PRIVATE ${xeus-zmq_TARGET} Threads::Threads)
target_compile_features(test_client_ipc PRIVATE cxx_std_11)
target_compile_features(test_client_ipc PRIVATE cxx_std_17)

endif (UNIX)

0 comments on commit 5fbf1d4

Please sign in to comment.