Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis: command splitting #616

Merged
merged 4 commits into from
Mar 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/envoy/redis/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ class RespValue {
RespValue() : type_(RespType::Null) {}
~RespValue() { cleanup(); }

/**
* Convert a RESP value to a string for debugging purposes.
*/
std::string toString() const;

/**
* The following are getters and setters for the internal value. A RespValue start as null,
* and much change type via type() before the following methods can be used.
Expand Down
58 changes: 58 additions & 0 deletions include/envoy/redis/command_splitter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include "envoy/common/pure.h"
#include "envoy/redis/codec.h"

namespace Redis {
namespace CommandSplitter {

/**
* A handle to a split request.
*/
class SplitRequest {
public:
virtual ~SplitRequest() {}

/**
* Cancel the request. No further request callbacks will be called.
*/
virtual void cancel() PURE;
};

typedef std::unique_ptr<SplitRequest> SplitRequestPtr;

/**
* Split request callbacks.
*/
class SplitCallbacks {
public:
virtual ~SplitCallbacks() {}

/**
* Called when the response is ready.
* @param value supplies the response which is now owned by the callee.
*/
virtual void onResponse(RespValuePtr&& value) PURE;
};

/**
* A command splitter that takes incoming redis commands and splits them as appropriate to a
* backend connection pool.
*/
class Instance {
public:
virtual ~Instance() {}

/**
* Make a split redis request.
* @param request supplies the split request to make.
* @param callbacks supplies the split request completion callbacks.
* @return SplitRequestPtr a handle to the active request or nullptr if the request has already
* been satisfied (via onResponse() being called). The splitter ALWAYS calls
* onResponse() for a given request.
*/
virtual SplitRequestPtr makeRequest(const RespValue& request, SplitCallbacks& callbacks) PURE;
};

} // CommandSplitter
} // Redis
24 changes: 13 additions & 11 deletions include/envoy/redis/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace ConnPool {
/**
* A handle to an outbound request.
*/
class ActiveRequest {
class PoolRequest {
public:
virtual ~ActiveRequest() {}
virtual ~PoolRequest() {}

/**
* Cancel the request. No further request callbacks will be called.
Expand All @@ -22,9 +22,9 @@ class ActiveRequest {
/**
* Outbound request callbacks.
*/
class ActiveRequestCallbacks {
class PoolCallbacks {
public:
virtual ~ActiveRequestCallbacks() {}
virtual ~PoolCallbacks() {}

/**
* Called when a pipelined response is received.
Expand Down Expand Up @@ -59,10 +59,10 @@ class Client {
* Make a pipelined request to the remote redis server.
* @param request supplies the RESP request to make.
* @param callbacks supplies the request callbacks.
* @return ActiveRequest* a handle to the active request.
* @return PoolRequest* a handle to the active request or nullptr if the request could not be made
* for some reason.
*/
virtual ActiveRequest* makeRequest(const RespValue& request,
ActiveRequestCallbacks& callbacks) PURE;
virtual PoolRequest* makeRequest(const RespValue& request, PoolCallbacks& callbacks) PURE;
};

typedef std::unique_ptr<Client> ClientPtr;
Expand Down Expand Up @@ -93,12 +93,14 @@ class Instance {
* @param hash_key supplies the key to use for consistent hashing.
* @param request supplies the request to make.
* @param callbacks supplies the request completion callbacks.
* @return ActiveRequest* a handle to the active request or nullptr if the request could not
* be made for some reason.
* @return PoolRequest* a handle to the active request or nullptr if the request could not be made
* for some reason.
*/
virtual ActiveRequest* makeRequest(const std::string& hash_key, const RespValue& request,
ActiveRequestCallbacks& callbacks) PURE;
virtual PoolRequest* makeRequest(const std::string& hash_key, const RespValue& request,
PoolCallbacks& callbacks) PURE;
};

typedef std::unique_ptr<Instance> InstancePtr;

} // ConnPool
} // Redis
1 change: 1 addition & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ add_library(
profiler/profiler.cc
ratelimit/ratelimit_impl.cc
redis/codec_impl.cc
redis/command_splitter_impl.cc
redis/conn_pool_impl.cc
redis/proxy_filter.cc
router/config_impl.cc
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/filter/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ enum class FilterRequestType { Internal, External, Both };
/**
* Global configuration for the HTTP rate limit filter.
*/
class FilterConfig : Json::JsonValidator {
class FilterConfig : Json::Validator {
public:
FilterConfig(const Json::Object& config, const LocalInfo::LocalInfo& local_info,
Stats::Store& global_store, Runtime::Loader& runtime, Upstream::ClusterManager& cm)
: Json::JsonValidator(config, Json::Schema::RATE_LIMIT_HTTP_FILTER_SCHEMA),
: Json::Validator(config, Json::Schema::RATE_LIMIT_HTTP_FILTER_SCHEMA),
domain_(config.getString("domain")),
stage_(static_cast<uint64_t>(config.getInteger("stage", 0))),
request_type_(stringToType(config.getString("request_type", "both"))),
Expand Down
4 changes: 2 additions & 2 deletions source/common/json/json_validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace Json {
/**
* Base class to inherit from to validate config schema before initializing member variables.
*/
class JsonValidator {
class Validator {
public:
JsonValidator(const Json::Object& config, const std::string& schema) {
Validator(const Json::Object& config, const std::string& schema) {
config.validateSchema(schema);
}
};
Expand Down
25 changes: 25 additions & 0 deletions source/common/redis/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,31 @@

namespace Redis {

std::string RespValue::toString() const {
switch (type_) {
case RespType::Array: {
std::string ret = "[";
for (uint64_t i = 0; i < asArray().size(); i++) {
ret += asArray()[i].toString();
if (i != asArray().size() - 1) {
ret += ", ";
}
}
return ret + "]";
}
case RespType::SimpleString:
case RespType::BulkString:
case RespType::Error:
return fmt::format("\"{}\"", asString());
case RespType::Null:
return "null";
case RespType::Integer:
return std::to_string(asInteger());
}

NOT_REACHED;
}

std::vector<RespValue>& RespValue::asArray() {
ASSERT(type_ == RespType::Array);
return array_;
Expand Down
170 changes: 170 additions & 0 deletions source/common/redis/command_splitter_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#include "command_splitter_impl.h"

#include "common/common/assert.h"

namespace Redis {
namespace CommandSplitter {

RespValuePtr Utility::makeError(const std::string& error) {
RespValuePtr response(new RespValue());
response->type(RespType::Error);
response->asString() = error;
return response;
}

SplitRequestPtr AllParamsToOneServerCommandHandler::startRequest(const RespValue& request,
SplitCallbacks& callbacks) {
std::unique_ptr<SplitRequestImpl> request_handle(new SplitRequestImpl(callbacks));
request_handle->handle_ =
conn_pool_.makeRequest(request.asArray()[1].asString(), request, *request_handle);
if (!request_handle->handle_) {
callbacks.onResponse(Utility::makeError("no upstream host"));
return nullptr;
}

return std::move(request_handle);
}

AllParamsToOneServerCommandHandler::SplitRequestImpl::~SplitRequestImpl() { ASSERT(!handle_); }

void AllParamsToOneServerCommandHandler::SplitRequestImpl::cancel() {
handle_->cancel();
handle_ = nullptr;
}

void AllParamsToOneServerCommandHandler::SplitRequestImpl::onResponse(RespValuePtr&& response) {
handle_ = nullptr;
log_debug("redis: response: '{}'", response->toString());
callbacks_.onResponse(std::move(response));
}

void AllParamsToOneServerCommandHandler::SplitRequestImpl::onFailure() {
handle_ = nullptr;
callbacks_.onResponse(Utility::makeError("upstream failure"));
}

SplitRequestPtr MGETCommandHandler::startRequest(const RespValue& request,
SplitCallbacks& callbacks) {
std::unique_ptr<SplitRequestImpl> request_handle(
new SplitRequestImpl(callbacks, request.asArray().size() - 1));

// Create the get request that we will use for each split get below.
std::vector<RespValue> values(2);
values[0].type(RespType::BulkString);
values[0].asString() = "get";
values[1].type(RespType::BulkString);
RespValue single_mget;
single_mget.type(RespType::Array);
single_mget.asArray().swap(values);

for (uint64_t i = 1; i < request.asArray().size(); i++) {
request_handle->pending_requests_.emplace_back(*request_handle, i - 1);
SplitRequestImpl::PendingRequest& pending_request = request_handle->pending_requests_.back();

single_mget.asArray()[1].asString() = request.asArray()[i].asString();
log_debug("redis: parallel get: '{}'", single_mget.toString());
pending_request.handle_ =
conn_pool_.makeRequest(request.asArray()[i].asString(), single_mget, pending_request);
if (!pending_request.handle_) {
pending_request.onResponse(Utility::makeError("no upstream host"));
}
}

return request_handle->pending_responses_ > 0 ? std::move(request_handle) : nullptr;
}

MGETCommandHandler::SplitRequestImpl::SplitRequestImpl(SplitCallbacks& callbacks,
uint32_t num_responses)
: callbacks_(callbacks), pending_responses_(num_responses) {
pending_response_.reset(new RespValue());
pending_response_->type(RespType::Array);
std::vector<RespValue> responses(num_responses);
pending_response_->asArray().swap(responses);
pending_requests_.reserve(num_responses);
}

MGETCommandHandler::SplitRequestImpl::~SplitRequestImpl() {
#ifndef NDEBUG
for (const PendingRequest& request : pending_requests_) {
ASSERT(!request.handle_);
}
#endif
}

void MGETCommandHandler::SplitRequestImpl::cancel() {
for (PendingRequest& request : pending_requests_) {
if (request.handle_) {
request.handle_->cancel();
request.handle_ = nullptr;
}
}
}

void MGETCommandHandler::SplitRequestImpl::onResponse(RespValuePtr&& value, uint32_t index) {
pending_requests_[index].handle_ = nullptr;

pending_response_->asArray()[index].type(value->type());
switch (value->type()) {
case RespType::Array:
case RespType::Integer: {
pending_response_->asArray()[index].type(RespType::Error);
pending_response_->asArray()[index].asString() = "upstream protocol error";
break;
}
case RespType::SimpleString:
case RespType::BulkString:
case RespType::Error: {
pending_response_->asArray()[index].asString().swap(value->asString());
break;
}
case RespType::Null:
break;
}

ASSERT(pending_responses_ > 0);
if (--pending_responses_ == 0) {
log_debug("redis: response: '{}'", pending_response_->toString());
callbacks_.onResponse(std::move(pending_response_));
}
}

void MGETCommandHandler::SplitRequestImpl::onFailure(uint32_t index) {
onResponse(Utility::makeError("upstream failure"), index);
}

InstanceImpl::InstanceImpl(ConnPool::InstancePtr&& conn_pool)
: conn_pool_(std::move(conn_pool)), all_to_one_handler_(*conn_pool_),
mget_handler_(*conn_pool_) {
// TODO(mattklein123) PERF: Make this a trie (like in header_map_impl).
// TODO(mattklein123): Make not case sensitive (like in header_map_impl).
command_map_.emplace("incr", all_to_one_handler_);
command_map_.emplace("incrby", all_to_one_handler_);
command_map_.emplace("mget", mget_handler_);
}

SplitRequestPtr InstanceImpl::makeRequest(const RespValue& request, SplitCallbacks& callbacks) {
if (request.type() != RespType::Array || request.asArray().size() < 2) {
callbacks.onResponse(Utility::makeError("invalid request"));
return nullptr;
}

for (const RespValue& value : request.asArray()) {
if (value.type() != RespType::BulkString) {
callbacks.onResponse(Utility::makeError("invalid request"));
return nullptr;
}
}

auto handler = command_map_.find(request.asArray()[0].asString());
if (handler == command_map_.end()) {
callbacks.onResponse(Utility::makeError(
fmt::format("unsupported command '{}'", request.asArray()[0].asString())));
return nullptr;
}

log_debug("redis: splitting '{}'", request.toString());
return handler->second.get().startRequest(request, callbacks);
}

} // CommandSplitter
} // Redis
Loading