Skip to content

Commit

Permalink
Fixed stop handling
Browse files Browse the repository at this point in the history
  • Loading branch information
JohanMabille committed Jun 29, 2024
1 parent 3142b07 commit 7a30042
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/client/xclient_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ namespace xeus
m_heartbeat_controller.send(stop_msg, zmq::send_flags::none);
(void)m_heartbeat_controller.recv(response);
}
}
}
6 changes: 5 additions & 1 deletion src/client/xclient_zmq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ namespace xeus

// Has to be in the cpp because incomplete
// types are used in unique_ptr in the header
xclient_zmq_impl::~xclient_zmq_impl() = default;
xclient_zmq_impl::~xclient_zmq_impl()
{
m_iopub_thread.join();
m_heartbeat_thread.join();
}

void xclient_zmq_impl::send_on_shell(xmessage msg)
{
Expand Down
40 changes: 34 additions & 6 deletions src/client/xheartbeat_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#include <iostream>

#include "xheartbeat_client.hpp"
#include "xclient_zmq_impl.hpp"
#include "../common/xmiddleware_impl.hpp"
Expand All @@ -23,6 +25,7 @@ namespace xeus
, m_max_retry(max_retry)
, m_heartbeat_timeout(timeout)
, m_heartbeat_end_point("")
, m_request_stop(false)
{
m_heartbeat_end_point = get_end_point(config.m_transport, config.m_ip, config.m_hb_port);
m_heartbeat.connect(m_heartbeat_end_point);
Expand All @@ -42,9 +45,35 @@ namespace xeus

bool xheartbeat_client::wait_for_answer(long timeout)
{
m_heartbeat.set(zmq::sockopt::linger, static_cast<int>(timeout));
zmq::message_t response;
return m_heartbeat.recv(response).has_value();
zmq::pollitem_t items[] = {
{ m_heartbeat, 0, ZMQ_POLLIN, 0 }, { m_controller, 0, ZMQ_POLLIN, 0 }
};

zmq::poll(&items[0], 2, std::chrono::milliseconds(timeout));
try
{
if (items[0].revents & ZMQ_POLLIN)
{
zmq::multipart_t wire_msg;
wire_msg.recv(m_heartbeat);
}

if (items[1].revents & ZMQ_POLLIN)
{
// stop message
zmq::multipart_t wire_msg;
wire_msg.recv(m_controller);
wire_msg.send(m_controller);
m_request_stop = true;
}

return true;
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return false;
}

void xheartbeat_client::register_kernel_status_listener(const kernel_status_listener& l)
Expand All @@ -59,10 +88,9 @@ namespace xeus

void xheartbeat_client::run()
{
bool stop = false;
std::size_t retry_count = 0;

while(!stop)
while(!m_request_stop)
{
send_heartbeat_message();
if(!wait_for_answer(m_heartbeat_timeout))
Expand All @@ -74,7 +102,7 @@ namespace xeus
else
{
notify_kernel_dead(true);
stop = true;
break;
}
}
else
Expand Down
3 changes: 2 additions & 1 deletion src/client/xheartbeat_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ namespace xeus
const long m_heartbeat_timeout;

std::string m_heartbeat_end_point;
bool m_request_stop;
};
}

#endif
#endif
14 changes: 5 additions & 9 deletions src/client/xiopub_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ namespace xeus

void xiopub_client::run()
{
zmq::multipart_t wire_msg;
zmq::pollitem_t items[] = {
{ m_iopub, 0, ZMQ_POLLIN, 0 }, { m_controller, 0, ZMQ_POLLIN, 0 }
};
Expand All @@ -71,6 +70,7 @@ namespace xeus
{
if (items[0].revents & ZMQ_POLLIN)
{
zmq::multipart_t wire_msg;
wire_msg.recv(m_iopub);
xpub_message msg = p_client_impl->deserialize_iopub(wire_msg);
{
Expand All @@ -80,15 +80,11 @@ namespace xeus
}
if (items[1].revents & ZMQ_POLLIN)
{
// stop message
zmq::multipart_t wire_msg;
wire_msg.recv(m_controller);
if (wire_msg.size() > 0)
{
std::string received_msg = wire_msg.at(0).to_string();
if (received_msg == "stop")
{
break;
}
}
wire_msg.send(m_controller);
break;
}
}
catch (std::exception& e)
Expand Down

0 comments on commit 7a30042

Please sign in to comment.