Skip to content

Commit

Permalink
Registering scheduled timers for the retried invocations so that they…
Browse files Browse the repository at this point in the history
… can be cancelled upon shutdown.

Also added exception handling into event handling so that it does not throw and kill the io thread.
  • Loading branch information
ihsandemir committed Apr 17, 2020
1 parent 83daea0 commit 55f9a03
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "hazelcast/client/spi/LifecycleService.h"
#include "hazelcast/client/exception/ProtocolExceptions.h"
#include "hazelcast/util/SynchronizedMap.h"

#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
#pragma warning(push)
Expand Down Expand Up @@ -64,46 +65,55 @@ namespace hazelcast {
std::shared_ptr<boost::asio::steady_timer> scheduleWithRepetition(CompletionToken token,
const std::chrono::steady_clock::duration &delay,
const std::chrono::steady_clock::duration &period) {
// TODO: Look at boost thread scheduler for this implementation
auto timer = std::make_shared<boost::asio::steady_timer>(*internalExecutor);
return scheduleWithRepetitionInternal(token, delay, period, timer);
auto timerId = timerCounter++;
auto scheduledTimer = scheduleWithRepetitionInternal(token, delay, period, timer, timerId);
timersMap.put(timerCounter++, scheduledTimer);
return scheduledTimer;
}

const boost::asio::thread_pool &getUserExecutor() const;

static void shutdownThreadPool(boost::asio::thread_pool *pool);

private:
std::unique_ptr<boost::asio::thread_pool> internalExecutor;
std::unique_ptr<boost::asio::thread_pool> userExecutor;
spi::LifecycleService &lifecycleService;
const ClientProperties &clientProperties;
int userExecutorPoolSize;
util::SynchronizedMap<std::intmax_t, boost::asio::steady_timer> timersMap;
std::atomic_intmax_t timerCounter;

template<typename CompletionToken>
std::shared_ptr<boost::asio::steady_timer> scheduleWithRepetitionInternal(CompletionToken token,
const std::chrono::steady_clock::duration &delay,
const std::chrono::steady_clock::duration &period,
std::shared_ptr<boost::asio::steady_timer> timer) {
// TODO: Look at boost thread scheduler for this implementation
std::shared_ptr<boost::asio::steady_timer> timer,
std::intmax_t timerId) {
if (delay.count() > 0) {
timer->expires_from_now(delay);
} else {
timer->expires_from_now(period);
}
timer->async_wait([this, token, period, timer](boost::system::error_code ec) {
if (ec) {
return;
}
timer->async_wait([=](boost::system::error_code ec) {
timersMap.remove(timerId);
try {
token();
} catch (std::exception &) {
assert(false);
}

if (ec) {
return;
}

if (period.count()) {
if (!lifecycleService.isRunning()) {
return;
}
scheduleWithRepetitionInternal(token, std::chrono::seconds(-1), period, timer);
scheduleWithRepetitionInternal(token, std::chrono::seconds(-1), period, timer,
timerCounter++);
}
});
return timer;
Expand Down
108 changes: 64 additions & 44 deletions hazelcast/src/hazelcast/client/spi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,8 @@ namespace hazelcast {
const ClientProperties &properties,
int32_t poolSize,
spi::LifecycleService &service)
: lifecycleService(service), clientProperties(properties), userExecutorPoolSize(poolSize) {}
: lifecycleService(service), clientProperties(properties), userExecutorPoolSize(poolSize),
timerCounter(0) {}

void ClientExecutionServiceImpl::start() {
int internalPoolSize = clientProperties.getInteger(clientProperties.getInternalExecutorPoolSize());
Expand All @@ -1241,6 +1242,10 @@ namespace hazelcast {
}

void ClientExecutionServiceImpl::shutdown() {
for (auto &timer : timersMap.values()) {
boost::system::error_code ec;
timer->cancel(ec);
}
shutdownThreadPool(userExecutor.get());
shutdownThreadPool(internalExecutor.get());
}
Expand All @@ -1255,12 +1260,12 @@ namespace hazelcast {
}

pool->stop();

#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
// needed due to bug https://github.com/chriskohlhoff/asio/issues/431
boost::asio::use_service<boost::asio::detail::win_iocp_io_context>(*pool).stop();
#else
boost::asio::use_service<boost::asio::detail::io_context_impl>(*pool).stop();
#endif

pool->join();
}

Expand Down Expand Up @@ -1847,45 +1852,51 @@ namespace hazelcast {
}

void ClientPartitionServiceImpl::refreshPartitions() {
clientExecutionService.execute([=]() {
if (!client.getLifecycleService().isRunning()) {
return;
}


try {
connection::ClientConnectionManagerImpl &connectionManager = client.getConnectionManager();
std::shared_ptr<connection::Connection> connection = connectionManager.getOwnerConnection();
if (!connection.get()) {
try {
clientExecutionService.execute([=]() {
if (!client.getLifecycleService().isRunning()) {
return;
}
std::unique_ptr<protocol::ClientMessage> requestMessage = protocol::codec::ClientGetPartitionsCodec::encodeRequest();
std::shared_ptr<ClientInvocation> invocation = ClientInvocation::create(client,
requestMessage, "");
auto future = invocation->invokeUrgent();
future.then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
try {
auto responseMessage = f.get();
protocol::codec::ClientGetPartitionsCodec::ResponseParameters response =
protocol::codec::ClientGetPartitionsCodec::ResponseParameters::decode(
responseMessage);
processPartitionResponse(response.partitions, response.partitionStateVersion,
response.partitionStateVersionExist);
} catch (std::exception &e) {
if (client.getLifecycleService().isRunning()) {
logger->warning("Error while fetching cluster partition table! Cause:",
e.what());
}

try {
connection::ClientConnectionManagerImpl &connectionManager = client.getConnectionManager();
std::shared_ptr<connection::Connection> connection = connectionManager.getOwnerConnection();
if (!connection.get()) {
return;
}
std::unique_ptr<protocol::ClientMessage> requestMessage = protocol::codec::ClientGetPartitionsCodec::encodeRequest();
std::shared_ptr<ClientInvocation> invocation = ClientInvocation::create(client,
requestMessage, "");
auto future = invocation->invokeUrgent();
future.then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
try {
auto responseMessage = f.get();
protocol::codec::ClientGetPartitionsCodec::ResponseParameters response =
protocol::codec::ClientGetPartitionsCodec::ResponseParameters::decode(
responseMessage);
processPartitionResponse(response.partitions, response.partitionStateVersion,
response.partitionStateVersionExist);
} catch (std::exception &e) {
if (client.getLifecycleService().isRunning()) {
logger->warning("Error while fetching cluster partition table! Cause:",
e.what());
}
}

});
} catch (exception::IException &e) {
if (client.getLifecycleService().isRunning()) {
logger->warning(
std::string("Error while fetching cluster partition table! ") + e.what());
});
} catch (exception::IException &e) {
if (client.getLifecycleService().isRunning()) {
logger->warning(
std::string("Error while fetching cluster partition table! ") + e.what());
}
}
});
} catch (exception::IException &e) {
if (client.getLifecycleService().isRunning()) {
logger->warning(
std::string("Could not scheduke cluster partition table fetching! ") + e.what());
}
});
}
}

void ClientPartitionServiceImpl::handlePartitionsEventV15(
Expand Down Expand Up @@ -2191,7 +2202,10 @@ namespace hazelcast {
[=]() { processEventMessage(invocation, response); });

} catch (const std::exception &e) {
logger.warning("Event clientMessage could not be handled. ", e.what());
if (clientContext.getLifecycleService().isRunning()) {
logger.warning("Delivery of event message to event handler failed. ", e.what(),
", *response, "", ", *invocation);
}
}
}

Expand Down Expand Up @@ -2316,10 +2330,6 @@ namespace hazelcast {
AbstractClientListenerService::invokeFromInternalThread(
const ClientRegistrationKey &registrationKey,
const std::shared_ptr<connection::Connection> &connection) {
//This method should only be called from registrationExecutor
/* TODO
assert (Thread.currentThread().getName().contains("eventRegistration"));
*/
try {
invoke(registrationKey, connection);
} catch (exception::IException &e) {
Expand Down Expand Up @@ -2364,12 +2374,22 @@ namespace hazelcast {
const std::shared_ptr<protocol::ClientMessage> response) {
auto eventHandler = invocation->getEventHandler();
if (eventHandler.get() == NULL) {
logger.warning("No eventHandler for invocation. Ignoring this invocation response. ",
*invocation);
if (clientContext.getLifecycleService().isRunning()) {
logger.warning("No eventHandler for invocation. Ignoring this invocation response. ",
*invocation);
}

return;
}

eventHandler->handle(response);
try {
eventHandler->handle(response);
} catch (std::exception &e) {
if (clientContext.getLifecycleService().isRunning()) {
logger.warning("Delivery of event message to event handler failed. ", e.what(),
", *response, "", ", *invocation);
}
}
}

bool AbstractClientListenerService::ConnectionPointerLessComparator::operator()(
Expand Down

0 comments on commit 55f9a03

Please sign in to comment.