Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Architectural changes to use boost thread_pool, boost::future, std::promise, std::nested_exception, connection local invocation and event handler maps #584

Merged
merged 18 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 14 additions & 49 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,50 +25,13 @@

cmake_minimum_required (VERSION 3.12.4)

function(downloadboost BINARY-DIR)
set(ZIPFN ${CMAKE_CURRENT_BINARY_DIR}/boost_1_72_0.zip)
set(OUTFN ${CMAKE_CURRENT_BINARY_DIR}/boost_1_72_0)

if(NOT EXISTS ${OUTFN})
if(NOT EXISTS ${ZIPFN})
if(${CMAKE_SYSTEM_NAME} MATCHES "Windows")
set(BOOST_DOWNLOAD_URL https://dl.bintray.com/boostorg/release/1.72.0/source/boost_1_72_0.7z)
set(BOOST_CHECKSUM 247a91dd7e4d9dd3c4b954b532fbc167ba62dc15ab834e5ad893d7c3f9eb5f0f)
else()
set(BOOST_DOWNLOAD_URL https://dl.bintray.com/boostorg/release/1.72.0/source/boost_1_72_0.tar.bz2)
set(BOOST_CHECKSUM 59c9b274bc451cf91a9ba1dd2c7fdcaf5d60b1b3aa83f2c9fa143417cc660722)
endif()
file(DOWNLOAD ${BOOST_DOWNLOAD_URL} ${ZIPFN}
SHOW_PROGRESS
EXPECTED_HASH SHA256=${BOOST_CHECKSUM})
endif()

execute_process(COMMAND ${CMAKE_COMMAND} -E tar -xf ${ZIPFN}
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
endif()
endfunction()

function (find_boost BINARY_DIR)
# try finding the boost installation
find_package(Boost 1.69 QUIET)
if (NOT Boost_FOUND)
message(STATUS "Could not find an existing Boost installation (If you have a boost installation and you want to use it, please make sure that cmake find_package[https://cmake.org/cmake/help/latest/command/find_package.html] can find it. Also see [https://cmake.org/cmake/help/latest/module/FindBoost.html]). I will download the needed boost components now into ${BINARY_DIR}.")
downloadboost(${BINARY_DIR})
set(Boost_INCLUDE_DIRS "${BINARY_DIR}/boost_1_72_0" PARENT_SCOPE)
endif()

if (Boost_FOUND)
message(STATUS "Found an existing Boost installation.")
endif()
endfunction()

project (HazelcastClient
VERSION 4.0
DESCRIPTION "Hazelcast C++ Client"
HOMEPAGE_URL https://hazelcast.org/clients/cplusplus/
LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED on)

include(TestBigEndian)
Expand Down Expand Up @@ -113,9 +76,6 @@ endif ()
message(STATUS "HAZELCAST_GIT_COMMIT_ID = ${HAZELCAST_GIT_COMMIT_ID}" )
add_definitions(-DHAZELCAST_GIT_COMMIT_ID=${HAZELCAST_GIT_COMMIT_ID})

# Disable any boost lib linking
add_definitions("-DBOOST_ALL_NO_LIB -DBOOST_ERROR_CODE_HEADER_ONLY")

message(STATUS "Preparing hazelcast client ..... ")

#detect endianness
Expand All @@ -137,17 +97,22 @@ FILE(GLOB_RECURSE HZ_GENERATED_HEADERS "./hazelcast/generated-sources/include/*h

include_directories(${PROJECT_SOURCE_DIR}/hazelcast/include ${PROJECT_SOURCE_DIR}/hazelcast/generated-sources/include ${PROJECT_SOURCE_DIR}/external/release_include)

find_boost(${CMAKE_CURRENT_BINARY_DIR})
if (${CMAKE_SYSTEM_NAME} MATCHES "Windows")
set(BOOST_HINTS "C:\\\\Boost")
endif ()
find_package(Boost 1.72 REQUIRED COMPONENTS thread chrono HINTS ${BOOST_HINTS})
include_directories(${Boost_INCLUDE_DIRS})
link_libraries(Boost::thread Boost::chrono)
add_definitions("-DBOOST_THREAD_VERSION=5")
message(STATUS "Using Boost_INCLUDE_DIRS: ${Boost_INCLUDE_DIRS}")

IF(NOT (${HZ_BIT} MATCHES "32") AND NOT (${HZ_BIT} MATCHES "64") )
message( STATUS "Build needs HZ_BIT. Setting default as -DHZ_BIT=64 (other option -DHZ_BIT=32)" )
set(HZ_BIT 64)
ENDIF(NOT (${HZ_BIT} MATCHES "32") AND NOT (${HZ_BIT} MATCHES "64"))
IF (NOT (${HZ_BIT} MATCHES "32") AND NOT (${HZ_BIT} MATCHES "64"))
message(STATUS "Build needs HZ_BIT. Setting default as -DHZ_BIT=64 (other option -DHZ_BIT=32)")
set(HZ_BIT 64)
ENDIF (NOT (${HZ_BIT} MATCHES "32") AND NOT (${HZ_BIT} MATCHES "64"))

IF(NOT (${HZ_LIB_TYPE} MATCHES "STATIC") AND NOT (${HZ_LIB_TYPE} MATCHES "SHARED") )
message( STATUS "Build needs HZ_LIB_TYPE. Setting default as -DHZ_LIB_TYPE=STATIC (other option -DHZ_LIB_TYPE=SHARED)" )
IF (NOT (${HZ_LIB_TYPE} MATCHES "STATIC") AND NOT (${HZ_LIB_TYPE} MATCHES "SHARED"))
message(STATUS "Build needs HZ_LIB_TYPE. Setting default as -DHZ_LIB_TYPE=STATIC (other option -DHZ_LIB_TYPE=SHARED)")
set(HZ_LIB_TYPE STATIC)
ENDIF(NOT (${HZ_LIB_TYPE} MATCHES "STATIC") AND NOT (${HZ_LIB_TYPE} MATCHES "SHARED") )

Expand Down Expand Up @@ -235,7 +200,7 @@ IF(${CMAKE_SYSTEM_NAME} MATCHES "Windows")

ENDIF(${CMAKE_SYSTEM_NAME} MATCHES "Windows")

add_definitions(-DBOOST_NO_AUTO_PTR -DELPP_DISABLE_ASSERT)
add_definitions(-DBOOST_NO_AUTO_PTR -DASIO_STANDALONE -DELPP_DISABLE_ASSERT -DELPP_WINSOCK2 -DWIN32_LEAN_AND_MEAN)

IF(HZ_BUILD_TESTS)
option(INSTALL_GMOCK "Install Googletest's GMock?" OFF)
Expand Down
3 changes: 3 additions & 0 deletions docker/hazelcast-fedora-i386.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ RUN dnf install -y maven net-tools gcovr
RUN dnf install -y fedora-repos-rawhide
RUN dnf --disablerepo=* --enablerepo=rawhide --nogpg install -y thrift-devel.i686

RUN dnf install -y wget bzip2
RUN dnf install -y wget bzip2 && wget https://dl.bintray.com/boostorg/release/1.72.0/source/boost_1_72_0.tar.bz2 && tar xjf boost_1_72_0.tar.bz2 && rm boost_1_72_0.tar.bz2 && cd boost_1_72_0 && ./bootstrap.sh && ./b2 --with-thread --with-chrono install && cd .. && rm -rf boost_1_72_0


3 changes: 3 additions & 0 deletions docker/hazelcast-fedora-x86_64.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ RUN dnf install -y maven net-tools gcovr
RUN dnf install -y fedora-repos-rawhide
RUN dnf --disablerepo=* --enablerepo=rawhide --nogpg install -y thrift-devel

# install boost 1.72.0
RUN dnf install -y wget bzip2 && wget https://dl.bintray.com/boostorg/release/1.72.0/source/boost_1_72_0.tar.bz2 && tar xjf boost_1_72_0.tar.bz2 && rm boost_1_72_0.tar.bz2 && cd boost_1_72_0 && ./bootstrap.sh && ./b2 --with-thread --with-chrono install && cd .. && rm -rf boost_1_72_0

16 changes: 9 additions & 7 deletions examples/Org.Website.Samples/ExecutorSample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ using namespace hazelcast::client;
// }
//
// @Override
// public String call()
// throws Exception {
// public String call() throws Exception {
// System.out.println(message);
// return message;
// }
Expand Down Expand Up @@ -91,8 +90,12 @@ class PrinterCallback : public ExecutionCallback<std::string> {
<< std::endl;
}

virtual void onFailure(const std::shared_ptr<exception::IException> &e) {
std::cout << "The execution of the task failed with exception:" << e << std::endl;
virtual void onFailure(std::exception_ptr e) {
try {
std::rethrow_exception(e);
} catch (hazelcast::client::exception::IException &e) {
std::cout << "The execution of the task failed with exception:" << e << std::endl;
}
}
};

Expand All @@ -118,10 +121,9 @@ int main() {
// Get the Distributed Executor Service
std::shared_ptr<IExecutorService> ex = hz.getExecutorService("my-distributed-executor");
// Submit the MessagePrinter Runnable to a random Hazelcast Cluster Member
std::shared_ptr<ICompletableFuture<std::string> > future = ex->submit<MessagePrinter, std::string>(
MessagePrinter("message to any node"));
auto future = ex->submit<MessagePrinter, std::string>(MessagePrinter("message to any node"));
// Wait for the result of the submitted task and print the result
std::shared_ptr<std::string> result = future->get();
std::shared_ptr<std::string> result = future.get_future().get();
std::cout << "Server result: " << *result << std::endl;
// Get the first Hazelcast Cluster Member
Member firstMember = hz.getCluster().getMembers()[0];
Expand Down
2 changes: 1 addition & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ This folder contains an extensive collection of Hazelcast C++ Client code sample

**distributed primitives** — Examples for basic distributed primitive structures.
* **atomiclong**: Illustrates simple IAtomicLong usage.
* **countdownlatch**: Illustrates simple ICountdown latch usage in the cluster.
* **countdownlatch**: Illustrates simple ICountdown boost::latch usage in the cluster.
* **crdt-pncounter**: Illustrates simple conflict free pozitive negtative counter usage.
* **idgenerator**: Illustrates IDGenerator usages. Example for Flake Id Generator and deprecated older implementations exist.
* **lock**: Illustrates simple ILock usage for eliminating races.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,22 @@

#include <hazelcast/client/HazelcastClient.h>

class ItemPrinter : public ExecutionCallback<int64_t> {
public:
virtual void onResponse(const std::shared_ptr<int64_t> &response) {
std::cout << "The sequence id of the added item is " << *response << std::endl;
}

virtual void onFailure(const std::shared_ptr<exception::IException> &e) {
std::cerr << "The response is a failure with exception:" << e << std::endl;
}
};

int main() {
hazelcast::client::HazelcastClient hz;

std::shared_ptr<hazelcast::client::Ringbuffer<std::string> > rb = hz.getRingbuffer<std::string>("myringbuffer");

// add an item in an unblocking way
std::shared_ptr<ICompletableFuture<int64_t> > future = rb->addAsync("new item",
hazelcast::client::Ringbuffer<std::string>::OVERWRITE);
auto future = rb->addAsync("new item", hazelcast::client::Ringbuffer<std::string>::OVERWRITE);

// let the result processed by a callback
std::shared_ptr<ExecutionCallback<int64_t> > callback(new ItemPrinter);
future->andThen(callback);
future.then([=](boost::future<std::shared_ptr<int64_t>> f) {
try {
std::cout << "The sequence id of the added item is " << *f.get() << std::endl;
} catch (hazelcast::client::exception::IException &e) {
std::cerr << "The response is a failure with exception:" << e << std::endl;
}
});

std::vector<std::string> items;
items.push_back("item2");
Expand All @@ -49,15 +42,14 @@ int main() {
// do some other work

// wait for the addAllAsync to complete and print the sequenceId of the last written item.
std::cout << "Sequence id of the last written item is :" << *future->get() << std::endl;
std::cout << "Sequence id of the last written item is :" << *future.get() << std::endl;

std::shared_ptr<ICompletableFuture<hazelcast::client::ringbuffer::ReadResultSet<std::string> > > resultSetFuture = rb->readManyAsync<void>(
0, 2, 3, NULL);
auto resultSetFuture = rb->readManyAsync<void>(0, 2, 3, NULL);

// do some other work

// get the result set
std::shared_ptr<ringbuffer::ReadResultSet<std::string> > readItems = resultSetFuture->get();
auto readItems = resultSetFuture.get();
std::cout << "Read " << readItems->readCount() << " items." << std::endl;

std::cout << "Finished" << std::endl;
Expand Down
38 changes: 15 additions & 23 deletions examples/distributed-map/async-api/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,17 @@

#include <hazelcast/client/HazelcastClient.h>

/**
* This class prints message on receiving the response or prints the exception if exception occurs
*/
class PrinterCallback : public hazelcast::client::ExecutionCallback<std::string> {
public:
virtual void onResponse(const std::shared_ptr<std::string> &response) {
std::cout << "Response was received. ";
if (response.get()) {
std::cout << "Received response is : " << *response << std::endl;
} else {
std::cout << "Received null response" << std::endl;
}
}

virtual void onFailure(const std::shared_ptr<exception::IException> &e) {
std::cerr << "A failure occured. The exception is:" << e << std::endl;
}
};

int main() {
hazelcast::client::HazelcastClient hz;

hazelcast::client::IMap<std::string, std::string> map =
hz.getMap<std::string, std::string>("themap");

// initiate map put in an unblocking way
std::shared_ptr<ICompletableFuture<std::string> > future = map.putAsync("key", "value");
auto future = map.putAsync("key", "value");

// later on get the result of the put operation
std::shared_ptr<std::string> result = future->get();
std::shared_ptr<std::string> result = future.get();
if (result.get()) {
std::cout << "There was a previous value for key. The value was:" << *result << std::endl;
} else {
Expand All @@ -57,8 +38,19 @@ int main() {

// Let the callback handle the response when received and print the appropriate message
// The callback will be called using the user executor thread.
std::shared_ptr<hazelcast::client::ExecutionCallback<std::string> > callback(new PrinterCallback);
future->andThen(callback);
future.then([=](boost::future<std::shared_ptr<std::string>> f) {
try {
std::cout << "Response was received. ";
auto result = f.get();
if (result) {
std::cout << "Received response is : " << *result << std::endl;
} else {
std::cout << "Received null response" << std::endl;
}
} catch (hazelcast::client::exception::IException &e) {

}
});

// Set the value to a new value in an unblocking manner
map.setAsync("key", "value2", 5, hazelcast::util::concurrent::TimeUnit::SECONDS());
Expand Down
27 changes: 14 additions & 13 deletions examples/distributed-map/entry-processor/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,38 +66,39 @@ int main() {
for (std::map<std::string, std::shared_ptr<int> >::const_iterator it = result.begin(); it != result.end(); ++it) {
std::cout << it->first << " salary: " << *it->second << std::endl;
}

// use submitToKey api
hazelcast::client::Future<int> future = employees.submitToKey<int, EmployeeRaiseEntryProcessor>("Mark", processor);
boost::future<std::shared_ptr<int>> future = employees.submitToKey<int, EmployeeRaiseEntryProcessor>("Mark",
processor);
// wait for 1 second
if (future.wait_for(1000) == hazelcast::client::future_status::ready) {
std::unique_ptr<int> result = future.get();
std::cout << "Got the result of submitToKey in 1 second for Mark" << " new salary: " << *result << std::endl;
if (future.wait_for(boost::chrono::seconds(1)) == boost::future_status::ready) {
auto r = future.get();
std::cout << "Got the result of submitToKey in 1 second for Mark" << " new salary: " << *r << std::endl;
} else {
std::cout << "Could not get the result of submitToKey in 1 second for Mark" << std::endl;
}

// multiple futures
std::vector<hazelcast::client::Future<int> > allFutures;
std::vector<boost::future<std::shared_ptr<int>>> allFutures;

// test putting into a vector of futures
future = employees.submitToKey<int, EmployeeRaiseEntryProcessor>(
"Mark", processor);
allFutures.push_back(future);
allFutures.push_back(std::move(future));

allFutures.push_back(employees.submitToKey<int, EmployeeRaiseEntryProcessor>(
"John", processor));

for (std::vector<hazelcast::client::Future<int> >::const_iterator it = allFutures.begin();it != allFutures.end();++it) {
hazelcast::client::future_status status = (*it).wait_for(1000);
if (status == hazelcast::client::future_status::ready) {
for (auto &f : allFutures) {
boost::future_status status = f.wait_for(boost::chrono::seconds(1));
if (status == boost::future_status::ready) {
std::cout << "Got ready for the future" << std::endl;
}
}

for (std::vector<hazelcast::client::Future<int> >::iterator it = allFutures.begin();it != allFutures.end();++it) {
std::unique_ptr<int> result = (*it).get();
std::cout << "Result:" << *result << std::endl;
for (auto &f : allFutures) {
auto r = f.get();
std::cout << "Result:" << *r << std::endl;
}

std::cout << "Finished" << std::endl;
Expand Down
Loading