Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

comment the graph::session module (Part 1) #3878

Merged
merged 9 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/graph/service/GraphService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ folly::Future<AuthResponse> GraphService::future_authenticate(const std::string&
return future;
}

if (!sessionManager_->isOutOfConnections()) {
if (sessionManager_->isOutOfConnections()) {
ctx->resp().errorCode = ErrorCode::E_TOO_MANY_CONNECTIONS;
ctx->resp().errorMsg.reset(new std::string("Too many connections in the cluster"));
ctx->finish();
Expand Down
7 changes: 3 additions & 4 deletions src/graph/session/ClientSession.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the comment format? This changes maybe invalidate the license checker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new comment requirement:
https://confluence.nebula-graph.io/pages/viewpage.action?pageId=35293986
The license check looks fine, it can pass.


#include "graph/session/ClientSession.h"

Expand Down
48 changes: 36 additions & 12 deletions src/graph/session/ClientSession.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#ifndef GRAPH_SESSION_CLIENTSESSION_H_
#define GRAPH_SESSION_CLIENTSESSION_H_

Expand All @@ -23,8 +23,15 @@ struct SpaceInfo {
meta::cpp2::SpaceDesc spaceDesc;
};

// ClientSession saves those information, including who created it, executed queries,
// space role, etc. The information of session will be persisted in meta server.
// One user corresponds to one ClientSession.
class ClientSession final {
public:
// Creates a new ClientSession.
// session: session obj used in RPC.
// metaClient: used to communicate with meta server.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add The information of session will be persisted in meta server.

// return: ClientSession which will be created.
static std::shared_ptr<ClientSession> create(meta::cpp2::Session&& session,
meta::MetaClient* metaClient);

Expand Down Expand Up @@ -61,6 +68,7 @@ class ClientSession final {
return roles_;
}

// Gets the role information of the user on a specific space.
StatusOr<meta::cpp2::RoleType> roleWithSpace(GraphSpaceID space) const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
auto ret = roles_.find(space);
Expand All @@ -70,6 +78,8 @@ class ClientSession final {
return ret->second;
}

// Whether the user corresponding to the session is a GOD user.
// As long as a user has the GOD role in any space, the user must be a GOD user.
bool isGod() const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
// Cloud may have multiple God accounts
Expand All @@ -86,8 +96,10 @@ class ClientSession final {
roles_.emplace(space, role);
}

// The idle time of the session. Unit: second.
uint64_t idleSeconds();

// Resets the idle time of the session.
void charge();

int32_t getTimezone() const {
Expand Down Expand Up @@ -129,14 +141,23 @@ class ClientSession final {
session_.space_name_ref() = spaceName;
}

// Binds a query to the session.
// qctx: represents a query.
void addQuery(QueryContext* qctx);

// Deletes a query from the session.
// qctx: represents a query.
void deleteQuery(QueryContext* qctx);

// Finds a query within the session.
// epId: represents a query.
bool findQuery(nebula::ExecutionPlanID epId) const;

// Marks a query as killed.
// epId: represents a query.
void markQueryKilled(nebula::ExecutionPlanID epId);

// Marks all queries as killed.
void markAllQueryKilled();

private:
Expand All @@ -145,17 +166,20 @@ class ClientSession final {
explicit ClientSession(meta::cpp2::Session&& session, meta::MetaClient* metaClient);

private:
SpaceInfo space_;
SpaceInfo space_; // The space that the session is using.
// When the idle time exceeds FLAGS_session_idle_timeout_secs,
// the session will expire and then be reclaimed.
time::Duration idleDuration_;
meta::cpp2::Session session_;
meta::MetaClient* metaClient_{nullptr};
meta::cpp2::Session session_; // The session object used in RPC.
meta::MetaClient* metaClient_{nullptr}; // The client of the meta server.
mutable folly::RWSpinLock rwSpinLock_;
/*
* map<spaceId, role>
* One user can have roles in multiple spaces
* But a user has only one role in one space
*/

// map<spaceId, role>
// One user can have roles in multiple spaces
// But a user has only one role in one space
std::unordered_map<GraphSpaceID, meta::cpp2::RoleType> roles_;
// An ExecutionPlanID represents a query.
// A QueryContext also represents a query.
std::unordered_map<ExecutionPlanID, QueryContext*> contexts_;
};

Expand Down
13 changes: 9 additions & 4 deletions src/graph/session/GraphSessionManager.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
// Copyright (c) 2018 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "graph/session/GraphSessionManager.h"

Expand All @@ -14,6 +13,8 @@
namespace nebula {
namespace graph {

// During construction, GraphSessionManager will start a background thread to periodically
// reclaim expired sessions and update session information to the meta server.
GraphSessionManager::GraphSessionManager(meta::MetaClient* metaClient, const HostAddr& hostAddr)
: SessionManager<ClientSession>(metaClient, hostAddr) {
scavenger_->addDelayTask(
Expand Down Expand Up @@ -143,6 +144,8 @@ void GraphSessionManager::removeSession(SessionID id) {
return;
}

// Before removing the session, all queries on the session
// need to be marked as killed.
iter->second->markAllQueryKilled();
auto resp = metaClient_->removeSession(id).get();
if (!resp.ok()) {
Expand Down Expand Up @@ -217,6 +220,8 @@ void GraphSessionManager::updateSessionsToMeta() {
}
}

// There may be expired queries, and the
// expired queries will be killed here.
auto handleKilledQueries = [this](auto&& resp) {
if (!resp.ok()) {
LOG(ERROR) << "Update sessions failed: " << resp.status();
Expand Down
68 changes: 47 additions & 21 deletions src/graph/session/GraphSessionManager.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
// Copyright (c) 2018 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#ifndef GRAPH_SESSION_GRAPHSESSIONMANAGER_H_
#define GRAPH_SESSION_GRAPHSESSIONMANAGER_H_
Expand All @@ -16,64 +15,91 @@
#include "interface/gen-cpp2/GraphService.h"
#include "interface/gen-cpp2/meta_types.h"

/**
* GraphSessionManager manages the client sessions, e.g. create new, find
* existing and drop expired.
*/

DECLARE_int64(max_allowed_connections);

namespace nebula {
namespace graph {

// GraphSessionManager manages the client sessions, e.g. create new, find existing and drop expired.
// Nebula's session management strategy:
// When a user requests the graph server to create a session, the graph server forwards the request
// to the meta server, which allocates the session and returns it to the graph server.
// The meta server manages all the sessions from all graph servers. One graph server only manages
// its own sessions, including periodically reclaiming expired sessions, and updating sessions
// information to the meta server in time. When the graph server restarts, it will pull all the
// sessions from the meta server and choose its own sessions for management.
class GraphSessionManager final : public SessionManager<ClientSession> {
public:
// Periodically reclaims expired sessions and updates information to the meta server.
// in the background.
// metaClient: The client of the meta server.
// hostAddr: The address of the current graph server.
GraphSessionManager(meta::MetaClient* metaClient, const HostAddr& hostAddr);
~GraphSessionManager() {}

// Pulls sessions from the meta server and chooses its own sessions for management.
Status init();

/**
* Create a new session
*/
// Creates a new session.
// userName: The name of the user who requesting to create the session.
// clientIp: The address of the client which sends the request.
// runner: Ensure that the corresponding callback function is executed on the runner.
// return: ClientSession which will be created.
folly::Future<StatusOr<std::shared_ptr<ClientSession>>> createSession(
const std::string userName, const std::string clientIp, folly::Executor* runner) override;

// Whether exceeds the max allowed connections.
bool isOutOfConnections() {
if (activeSessions_.size() >= static_cast<uint64_t>(FLAGS_max_allowed_connections)) {
LOG(INFO) << "The sessions of the cluster has more than "
"max_allowed_connections: "
<< FLAGS_max_allowed_connections;
return false;
return true;
}
return true;
return false;
}

/**
* Remove a session
*/
// Removes a session from both local and meta server.
// id: The id of the session which will be removed.
void removeSession(SessionID id) override;

// Finds an existing session. If it is not found locally, it will be searched from the meta
// server. id: The id of the session which will be found.
// runner: Ensure that the corresponding callback function is executed on the runner.
// return: ClientSession which will be found.
folly::Future<StatusOr<std::shared_ptr<ClientSession>>> findSession(
SessionID id, folly::Executor* runner) override;

/**
* Find an existing session
*/
// Finds an existing session only from local cache.
// id: The id of the session which will be found.
// return: ClientSession which will be found.
std::shared_ptr<ClientSession> findSessionFromCache(SessionID id);

// get all seesions in the local cache
// Gets all seesions from the local cache.
// return: All sessions of the local cache.
std::vector<meta::cpp2::Session> getSessionFromLocalCache() const;

private:
// Finds an existing session only from the meta server.
// id: The id of the session which will be found.
// runner: Ensure that the corresponding callback function is executed on the runner.
// return: ClientSession which will be found.
folly::Future<StatusOr<std::shared_ptr<ClientSession>>> findSessionFromMetad(
SessionID id, folly::Executor* runner);

// Entry function of the background thread.
// It will reclaim expired sessions and update sessions info to meta.
void threadFunc();

// Reclaims expired sessions.
// All queries within the expired session will be marked as killed.
void reclaimExpiredSessions();

// Updates sessions into to meta server.
void updateSessionsToMeta();

// Updates session info locally.
// session: ClientSession which will be updated.
void updateSessionInfo(ClientSession* session);
};

Expand Down