Skip to content

Commit

Permalink
Add session pool. (#106)
Browse files Browse the repository at this point in the history
* Add session pool.

* Rollback space.

* Test json.

* Add tests.

* Username and password shouldn't be empty.

* Move init to source file.

* Enable auth in CI.

* Fix typo.

* Set default min count of sessions to 1.

* Test switch space in on session.
  • Loading branch information
Shylock-Hg authored Oct 17, 2022
1 parent bdfa0d0 commit 26979b8
Show file tree
Hide file tree
Showing 9 changed files with 475 additions and 2 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ jobs:
set -e
;;
esac
sed -i 's/--enable_authorize=false/--enable_authorize=true/g' /usr/local/nebula/etc/nebula-graphd.conf
/usr/local/nebula/scripts/nebula.service start all
/usr/local/nebula/scripts/nebula.service status all
echo '127.0.0.1 graphd' >> /etc/hosts
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,5 @@ modules

# Coredump
core.*

.cache
1 change: 1 addition & 0 deletions include/nebula/client/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Session {
password_(password),
timezoneName_(timezoneName),
offsetSecs_(offsetSecs) {}
Session(const Session &) = delete; // no copy
Session(Session &&session)
: sessionId_(session.sessionId_),
conn_(std::move(session.conn_)),
Expand Down
84 changes: 84 additions & 0 deletions include/nebula/client/SessionPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2022 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#pragma once

#include <common/datatypes/DataSet.h>

#include <cassert>
#include <cstddef>
#include <mutex>

#include "nebula/client/Config.h"
#include "nebula/client/ConnectionPool.h"
#include "nebula/client/Session.h"

namespace nebula {

struct SessionPoolConfig {
std::string username_;
std::string password_;
std::vector<std::string> addrs_; // the list of graph addresses
std::string spaceName_;
// Socket timeout and Socket connection timeout, unit: seconds
std::uint32_t timeout_{0};
// The idleTime of the connection, unit: seconds
// If connection's idle time is longer than idleTime, it will be delete
// 0 value means the connection will not expire
std::uint32_t idleTime_{0};
std::uint32_t maxSize_{10}; // max size of the session pool. should be adjusted according to the
// max threads will be using.
std::uint32_t minSize_{1}; // min size of the session pool
};

class SessionPool {
public:
SessionPool() = delete;
explicit SessionPool(SessionPoolConfig config)
: config_(std::move(config)), pool_(new ConnectionPool()) {}
SessionPool(const SessionPool &) = delete; // no copy
SessionPool(SessionPool &&pool)
: config_(std::move(pool.config_)), pool_(std::move(pool.pool_)) {}

// initialize session and context
// return false when failed, otherwise return true
// When return false, the pool is in invalid state, user must destruct it
// instead of continue.
bool init();

// Session pool use fixed space, don't switch space when execute
ExecutionResponse execute(const std::string &stmt);

ExecutionResponse executeWithParameter(const std::string &stmt,
const std::unordered_map<std::string, Value> &parameters);

std::string executeJson(const std::string &stmt);

std::string executeJsonWithParameter(const std::string &stmt,
const std::unordered_map<std::string, Value> &parameters);

private:
std::pair<Session, bool> getIdleSession() {
std::lock_guard<std::mutex> l(m_);
if (idleSessions_.empty()) {
return std::make_pair(Session(), false);
}
auto session = std::move(idleSessions_.front());
idleSessions_.pop_front();
return std::make_pair(std::move(session), true);
}

void giveBack(Session &&session) {
std::lock_guard<std::mutex> l(m_);
idleSessions_.emplace_back(std::move(session));
}

SessionPoolConfig config_;
std::unique_ptr<ConnectionPool> pool_;
// destruct session before pool
std::mutex m_;
std::list<Session> idleSessions_;
};

} // namespace nebula
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ set(NEBULA_CLIENT_SOURCES
client/Connection.cpp
client/ConnectionPool.cpp
client/Session.cpp
client/SessionPool.cpp
)

set(NEBULA_MCLIENT_SOURCES
Expand Down
115 changes: 115 additions & 0 deletions src/client/SessionPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) 2022 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "nebula/client/SessionPool.h"

#include <folly/json.h>

#include "common/time/TimeConversion.h"
#include "nebula/client/ConnectionPool.h"

namespace nebula {

bool SessionPool::init() {
Config conf;
conf.maxConnectionPoolSize_ = config_.maxSize_;
conf.minConnectionPoolSize_ = config_.minSize_;
conf.idleTime_ = config_.idleTime_;
conf.timeout_ = config_.timeout_;
pool_->init(config_.addrs_, conf);
if (config_.spaceName_.empty()) {
return false;
}
if (config_.username_.empty() || config_.password_.empty()) {
return false;
}
std::string useSpace = "USE " + config_.spaceName_;
for (std::size_t i = 0; i < config_.maxSize_; ++i) {
// use space
auto session = pool_->getSession(config_.username_, config_.password_);
auto resp = session.execute(useSpace);
if (resp.errorCode == ErrorCode::SUCCEEDED) {
idleSessions_.emplace_back(std::move(session));
} else {
return false;
}
}
return true;
}

ExecutionResponse SessionPool::execute(const std::string &stmt) {
auto result = getIdleSession();
if (result.second) {
auto resp = result.first.execute(stmt);
if (*resp.spaceName != config_.spaceName_) {
// switch to origin space
result.first.execute("USE " + config_.spaceName_);
}
giveBack(std::move(result.first));
return resp;
} else {
return ExecutionResponse{ErrorCode::E_DISCONNECTED,
0,
nullptr,
nullptr,
std::make_unique<std::string>("No idle session.")};
}
}

ExecutionResponse SessionPool::executeWithParameter(
const std::string &stmt, const std::unordered_map<std::string, Value> &parameters) {
auto result = getIdleSession();
if (result.second) {
auto resp = result.first.executeWithParameter(stmt, parameters);
if (*resp.spaceName != config_.spaceName_) {
// switch to origin space
result.first.execute("USE " + config_.spaceName_);
}
giveBack(std::move(result.first));
return resp;
} else {
return ExecutionResponse{ErrorCode::E_DISCONNECTED,
0,
nullptr,
nullptr,
std::make_unique<std::string>("No idle session.")};
}
}

std::string SessionPool::executeJson(const std::string &stmt) {
auto result = getIdleSession();
if (result.second) {
auto resp = result.first.executeJson(stmt);
auto obj = folly::parseJson(resp);
if (obj["results"][0]["spaceName"].asString() != config_.spaceName_) {
// switch to origin space
result.first.execute("USE " + config_.spaceName_);
}
giveBack(std::move(result.first));
return resp;
} else {
// TODO handle error
return "";
}
}

std::string SessionPool::executeJsonWithParameter(
const std::string &stmt, const std::unordered_map<std::string, Value> &parameters) {
auto result = getIdleSession();
if (result.second) {
auto resp = result.first.executeJsonWithParameter(stmt, parameters);
auto obj = folly::parseJson(resp);
if (obj["results"][0]["spaceName"].asString() != config_.spaceName_) {
// switch to origin space
result.first.execute("USE " + config_.spaceName_);
}
giveBack(std::move(result.first));
return resp;
} else {
// TODO handle error
return "";
}
}

} // namespace nebula
14 changes: 14 additions & 0 deletions src/client/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,17 @@ nebula_add_test(
gtest
${NEBULA_THIRD_PARTY_LIBRARIES}
)

nebula_add_test(
NAME
session_pool_test
SOURCES
SessionPoolTest.cpp
OBJECTS
${NEBULA_CLIENT_OBJS}
$<TARGET_OBJECTS:nebula_common_obj>
$<TARGET_OBJECTS:nebula_graph_client_obj>
LIBRARIES
gtest
${NEBULA_THIRD_PARTY_LIBRARIES}
)
3 changes: 1 addition & 2 deletions src/client/tests/RegistHost.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ int main(int argc, char** argv) {
auto session = pool.getSession("root", "nebula");
CHECK(session.valid());

auto resp =
session.execute(folly::format("ADD HOSTS {}", FLAGS_host).str());
auto resp = session.execute(folly::format("ADD HOSTS {}", FLAGS_host).str());
CHECK_EQ(resp.errorCode, nebula::ErrorCode::SUCCEEDED) << *resp.errorMsg;
return 0;
}
Loading

0 comments on commit 26979b8

Please sign in to comment.