Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure Redis Proxy can follow redirection when doing request mirroring #8606

Merged
merged 34 commits into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
293d852
Refactor the redirection logic into the RedisConnPool.
HenryYYang Oct 14, 2019
36ebc52
Fix format
HenryYYang Oct 14, 2019
882e42b
Fix tests
HenryYYang Oct 14, 2019
031ff92
Merge branch 'master' into redirect_mirror
HenryYYang Oct 14, 2019
d151ae2
Fix move constructor
HenryYYang Oct 15, 2019
7536789
Fix clang errors
HenryYYang Oct 15, 2019
a9cfd91
Fix clang errors
HenryYYang Oct 17, 2019
5793b6b
Format code
HenryYYang Oct 17, 2019
fac3ffb
Use absl::variant instead of shared_ptr to improve performance.
HenryYYang Oct 17, 2019
b940d04
Fix format
HenryYYang Oct 17, 2019
e63f38e
Fix clang
HenryYYang Oct 17, 2019
dd78fb0
Fix clang
HenryYYang Oct 18, 2019
a1b0741
Kick CI
HenryYYang Oct 18, 2019
6c02aeb
Merge branch 'master' into redirect_mirror
HenryYYang Oct 19, 2019
cf4c3f6
fix merge errors
HenryYYang Oct 19, 2019
deab4b6
Fix format
HenryYYang Oct 20, 2019
7a80d45
Fix tests
HenryYYang Oct 23, 2019
35142f2
Merge branch 'master' into redirect_mirror
HenryYYang Nov 6, 2019
aa89024
Fix merge errors
HenryYYang Nov 6, 2019
45e3cb1
Merge branch 'master' into redirect_mirror
HenryYYang Nov 8, 2019
f8f7041
Add speed test and make respvalue const
HenryYYang Nov 12, 2019
36d7190
fix format
HenryYYang Nov 12, 2019
2e293a5
Merge branch 'master' into redirect_mirror
HenryYYang Nov 12, 2019
ac9ba4e
update command splitter test
HenryYYang Nov 13, 2019
47fd4d9
Fix failing test
HenryYYang Nov 13, 2019
b6f57cc
Add failure test for ConnPoolImpl
HenryYYang Nov 14, 2019
089f74b
Merge branch 'master' into redirect_mirror
HenryYYang Nov 14, 2019
98ab0bc
sort version_history file
HenryYYang Nov 14, 2019
93311e5
Change the onDirection method signature.
HenryYYang Nov 27, 2019
2e301c3
Merge branch 'master' into redirect_mirror
HenryYYang Nov 27, 2019
4896554
fix unit test
HenryYYang Nov 27, 2019
d8384cb
fix PR feedback
HenryYYang Nov 27, 2019
2df5066
fix PR feedback
HenryYYang Dec 2, 2019
eb2e2e3
fix PR feedback
HenryYYang Dec 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion source/extensions/clusters/redis/redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl {
void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override;
void onFailure() override;
// Note: Below callback isn't used in topology updates
bool onRedirection(NetworkFilters::Common::Redis::RespValuePtr&&) override { return true; }
bool onRedirection(NetworkFilters::Common::Redis::RespValuePtr&&, const std::string&,
bool) override {
return true;
}
void onUnexpectedResponse(const NetworkFilters::Common::Redis::RespValuePtr&);

Network::Address::InstanceConstSharedPtr
Expand Down
9 changes: 7 additions & 2 deletions source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ class ClientCallbacks {
/**
* Called when a MOVED or ASK redirection error is received, and the request must be retried.
* @param value supplies the MOVED error response
* @param host_address supplies the redirection host address and port
* @param ask_redirection indicates if this is a ASK redirection
* @return bool true if the request is successfully redirected, false otherwise
*/
virtual bool onRedirection(RespValuePtr&& value) PURE;
virtual bool onRedirection(RespValuePtr&& value, const std::string& host_address,
bool ask_redirection) PURE;
};

/**
Expand All @@ -62,7 +65,9 @@ class DoNothingPoolCallbacks : public ClientCallbacks {
// ClientCallbacks
void onResponse(Common::Redis::RespValuePtr&&) override {}
void onFailure() override {}
bool onRedirection(Common::Redis::RespValuePtr&&) override { return false; }
bool onRedirection(Common::Redis::RespValuePtr&&, const std::string&, bool) override {
return false;
}
};

/**
Expand Down
25 changes: 17 additions & 8 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,26 @@ void ClientImpl::onRespValue(RespValuePtr&& value) {
} else if (config_.enableRedirection() && (value->type() == Common::Redis::RespType::Error)) {
std::vector<absl::string_view> err = StringUtil::splitToken(value->asString(), " ", false);
bool redirected = false;
bool redirect_succeeded = false;
if (err.size() == 3) {
if (err[0] == RedirectionResponse::get().MOVED || err[0] == RedirectionResponse::get().ASK) {
redirected = callbacks.onRedirection(std::move(value));
if (redirected) {
host_->cluster().stats().upstream_internal_redirect_succeeded_total_.inc();
} else {
host_->cluster().stats().upstream_internal_redirect_failed_total_.inc();
}
// MOVED and ASK redirection errors have the following substrings: MOVED or ASK (err[0]), hash
// key slot (err[1]), and IP address and TCP port separated by a colon (err[2])
if (err[0] == RedirectionResponse::get().MOVED) {
redirected = true;
redirect_succeeded = callbacks.onRedirection(std::move(value), std::string(err[2]), false);
HenryYYang marked this conversation as resolved.
Show resolved Hide resolved
} else if (err[0] == RedirectionResponse::get().ASK) {
redirected = true;
redirect_succeeded = callbacks.onRedirection(std::move(value), std::string(err[2]), true);
}
} else {
}
if (!redirected) {
callbacks.onResponse(std::move(value));
} else {
if (redirect_succeeded) {
host_->cluster().stats().upstream_internal_redirect_succeeded_total_.inc();
} else {
host_->cluster().stats().upstream_internal_redirect_failed_total_.inc();
}
}
} else {
callbacks.onResponse(std::move(value));
Expand Down
23 changes: 0 additions & 23 deletions source/extensions/filters/network/common/redis/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,6 @@ AuthRequest::AuthRequest(const std::string& password) {
asArray().swap(values);
}

bool redirectionArgsInvalid(const Common::Redis::RespValue* original_request,
const Common::Redis::RespValue& error_response,
std::vector<absl::string_view>& error_substrings,
bool& ask_redirection) {
if ((original_request == nullptr) || (error_response.type() != Common::Redis::RespType::Error)) {
return true;
}
error_substrings = StringUtil::splitToken(error_response.asString(), " ", false);
if (error_substrings.size() != 3) {
return true;
}
if (error_substrings[0] == "ASK") {
ask_redirection = true;
} else if (error_substrings[0] == "MOVED") {
ask_redirection = false;
} else {
// The first substring must be MOVED or ASK.
return true;
}
// Other validation done later to avoid duplicate processing.
return false;
}

RespValuePtr makeError(const std::string& error) {
Common::Redis::RespValuePtr response(new RespValue());
response->type(Common::Redis::RespType::Error);
Expand Down
15 changes: 0 additions & 15 deletions source/extensions/filters/network/common/redis/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,6 @@ class AuthRequest : public Redis::RespValue {
AuthRequest(const std::string& password);
};

/**
* Validate the received moved/ask redirection error and the original redis request.
* @param[in] original_request supplies the incoming request associated with the command splitter
* request.
* @param[in] error_response supplies the moved/ask redirection response from the upstream Redis
* server.
* @param[out] error_substrings the non-whitespace substrings of error_response.
* @param[out] ask_redirection true if error_response is an ASK redirection error, false otherwise.
* @return bool true if the original_request or error_response are not valid, false otherwise.
*/
bool redirectionArgsInvalid(const Common::Redis::RespValue* original_request,
const Common::Redis::RespValue& error_response,
std::vector<absl::string_view>& error_substrings,
bool& ask_redirection);

RespValuePtr makeError(const std::string& error);

class ReadOnlyRequest : public Redis::RespValue {
Expand Down
16 changes: 3 additions & 13 deletions source/extensions/filters/network/redis_proxy/conn_pool_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,19 +376,9 @@ void InstanceImpl::PendingRequest::onFailure() {
parent_.onRequestCompleted();
}

bool InstanceImpl::PendingRequest::onRedirection(Common::Redis::RespValuePtr&& value) {
std::vector<absl::string_view> err;
bool ask_redirection = false;
if (Common::Redis::Utility::redirectionArgsInvalid(&getRequest(incoming_request_), *value, err,
ask_redirection)) {
onResponse(std::move(value));
return false;
}

// MOVED and ASK redirection errors have the following substrings: MOVED or ASK (err[0]), hash key
// slot (err[1]), and IP address and TCP port separated by a colon (err[2]).
const std::string host_address = std::string(err[2]);

bool InstanceImpl::PendingRequest::onRedirection(Common::Redis::RespValuePtr&& value,
const std::string& host_address,
bool ask_redirection) {
// Prepend request with an asking command if redirected via an ASK error. The returned handle is
// not important since there is no point in being able to cancel the request. The use of
// null_pool_callbacks ensures the transparent filtering of the Redis server's response to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ class InstanceImpl : public Instance {
// Common::Redis::Client::ClientCallbacks
void onResponse(Common::Redis::RespValuePtr&& response) override;
void onFailure() override;
bool onRedirection(Common::Redis::RespValuePtr&& value) override;
bool onRedirection(Common::Redis::RespValuePtr&& value, const std::string& host_address,
bool ask_redirection) override;

// PoolRequest
void cancel() override;
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/health_checkers/redis/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void RedisHealthChecker::RedisActiveHealthCheckSession::onFailure() {
}

bool RedisHealthChecker::RedisActiveHealthCheckSession::onRedirection(
NetworkFilters::Common::Redis::RespValuePtr&&) {
NetworkFilters::Common::Redis::RespValuePtr&&, const std::string&, bool) {
// Treat any redirection error response from a Redis server as success.
current_request_ = nullptr;
handleSuccess();
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/health_checkers/redis/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase {
// Extensions::NetworkFilters::Common::Redis::Client::ClientCallbacks
void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override;
void onFailure() override;
bool onRedirection(NetworkFilters::Common::Redis::RespValuePtr&& value) override;
bool onRedirection(NetworkFilters::Common::Redis::RespValuePtr&&, const std::string&,
bool) override;

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override;
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/clusters/redis/redis_cluster_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ class RedisClusterTest : public testing::Test,
new NetworkFilters::Common::Redis::RespValue()};
dummy_value->type(NetworkFilters::Common::Redis::RespType::Error);
dummy_value->asString() = "dummy text";
EXPECT_TRUE(discovery_session.onRedirection(std::move(dummy_value)));
EXPECT_TRUE(discovery_session.onRedirection(std::move(dummy_value), "dummy ip", false));

RedisCluster::RedisDiscoveryClient discovery_client(discovery_session);
EXPECT_NO_THROW(discovery_client.onAboveWriteBufferHighWatermark());
Expand Down
77 changes: 73 additions & 4 deletions test/extensions/filters/network/common/redis/client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,8 @@ TEST_F(RedisClientImplTest, AskRedirection) {
// The exact values of the hash slot and IP info are not important.
response1->asString() = "ASK 1111 10.1.2.3:4321";
// Simulate redirection failure.
EXPECT_CALL(callbacks1, onRedirection_(Ref(response1))).WillOnce(Return(false));
EXPECT_CALL(callbacks1, onRedirection_(Ref(response1), "10.1.2.3:4321", true))
.WillOnce(Return(false));
EXPECT_CALL(*connect_or_op_timer_, enableTimer(_, _));
EXPECT_CALL(host_->outlier_detector_,
putResult(Upstream::Outlier::Result::ExtOriginRequestSuccess, _));
Expand All @@ -678,7 +679,8 @@ TEST_F(RedisClientImplTest, AskRedirection) {
response2->type(Common::Redis::RespType::Error);
// The exact values of the hash slot and IP info are not important.
response2->asString() = "ASK 2222 10.1.2.4:4321";
EXPECT_CALL(callbacks2, onRedirection_(Ref(response2))).WillOnce(Return(true));
EXPECT_CALL(callbacks2, onRedirection_(Ref(response2), "10.1.2.4:4321", true))
.WillOnce(Return(true));
EXPECT_CALL(*connect_or_op_timer_, disableTimer());
EXPECT_CALL(host_->outlier_detector_,
putResult(Upstream::Outlier::Result::ExtOriginRequestSuccess, _));
Expand Down Expand Up @@ -727,7 +729,8 @@ TEST_F(RedisClientImplTest, MovedRedirection) {
// The exact values of the hash slot and IP info are not important.
response1->asString() = "MOVED 1111 10.1.2.3:4321";
// Simulate redirection failure.
EXPECT_CALL(callbacks1, onRedirection_(Ref(response1))).WillOnce(Return(false));
EXPECT_CALL(callbacks1, onRedirection_(Ref(response1), "10.1.2.3:4321", false))
.WillOnce(Return(false));
EXPECT_CALL(*connect_or_op_timer_, enableTimer(_, _));
EXPECT_CALL(host_->outlier_detector_,
putResult(Upstream::Outlier::Result::ExtOriginRequestSuccess, _));
Expand All @@ -739,7 +742,8 @@ TEST_F(RedisClientImplTest, MovedRedirection) {
response2->type(Common::Redis::RespType::Error);
// The exact values of the hash slot and IP info are not important.
response2->asString() = "MOVED 2222 10.1.2.4:4321";
EXPECT_CALL(callbacks2, onRedirection_(Ref(response2))).WillOnce(Return(true));
EXPECT_CALL(callbacks2, onRedirection_(Ref(response2), "10.1.2.4:4321", false))
.WillOnce(Return(true));
EXPECT_CALL(*connect_or_op_timer_, disableTimer());
EXPECT_CALL(host_->outlier_detector_,
putResult(Upstream::Outlier::Result::ExtOriginRequestSuccess, _));
Expand All @@ -754,6 +758,71 @@ TEST_F(RedisClientImplTest, MovedRedirection) {
client_->close();
}

TEST_F(RedisClientImplTest, RedirectionFailure) {
InSequence s;

setup();

Common::Redis::RespValue request1;
MockClientCallbacks callbacks1;
EXPECT_CALL(*encoder_, encode(Ref(request1), _));
EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false));
PoolRequest* handle1 = client_->makeRequest(request1, callbacks1);
EXPECT_NE(nullptr, handle1);

onConnected();

Common::Redis::RespValue request2;
MockClientCallbacks callbacks2;
EXPECT_CALL(*encoder_, encode(Ref(request2), _));
EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false));
PoolRequest* handle2 = client_->makeRequest(request2, callbacks2);
EXPECT_NE(nullptr, handle2);

EXPECT_EQ(2UL, host_->cluster_.stats_.upstream_rq_total_.value());
EXPECT_EQ(2UL, host_->cluster_.stats_.upstream_rq_active_.value());
EXPECT_EQ(2UL, host_->stats_.rq_total_.value());
EXPECT_EQ(2UL, host_->stats_.rq_active_.value());

Buffer::OwnedImpl fake_data;
EXPECT_CALL(*decoder_, decode(Ref(fake_data))).WillOnce(Invoke([&](Buffer::Instance&) -> void {
InSequence s;

// Test an error that looks like it might be a MOVED or ASK redirection error except for the
// first non-whitespace substring.
Common::Redis::RespValuePtr response1{new Common::Redis::RespValue()};
response1->type(Common::Redis::RespType::Error);
response1->asString() = "NOTMOVEDORASK 1111 1.1.1.1:1";

EXPECT_CALL(callbacks1, onResponse_(Ref(response1)));
EXPECT_CALL(*connect_or_op_timer_, enableTimer(_, _));
EXPECT_CALL(host_->outlier_detector_,
putResult(Upstream::Outlier::Result::ExtOriginRequestSuccess, _));
callbacks_->onRespValue(std::move(response1));

EXPECT_EQ(0UL, host_->cluster_.stats_.upstream_internal_redirect_succeeded_total_.value());
EXPECT_EQ(0UL, host_->cluster_.stats_.upstream_internal_redirect_failed_total_.value());

// Test a truncated MOVED error response that cannot be parsed properly.
Common::Redis::RespValuePtr response2(new Common::Redis::RespValue());
response2->type(Common::Redis::RespType::Error);
response2->asString() = "MOVED 1111";
EXPECT_CALL(callbacks2, onResponse_(Ref(response2)));
EXPECT_CALL(*connect_or_op_timer_, enableTimer(_, _));
EXPECT_CALL(host_->outlier_detector_,
putResult(Upstream::Outlier::Result::ExtOriginRequestSuccess, _));
callbacks_->onRespValue(std::move(response2));

EXPECT_EQ(0UL, host_->cluster_.stats_.upstream_internal_redirect_succeeded_total_.value());
EXPECT_EQ(0UL, host_->cluster_.stats_.upstream_internal_redirect_failed_total_.value());
}));
upstream_read_filter_->onData(fake_data, false);

EXPECT_CALL(*upstream_connection_, close(Network::ConnectionCloseType::NoFlush));
EXPECT_CALL(*connect_or_op_timer_, disableTimer());
client_->close();
}

TEST_F(RedisClientImplTest, AskRedirectionNotEnabled) {
InSequence s;

Expand Down
8 changes: 6 additions & 2 deletions test/extensions/filters/network/common/redis/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,15 @@ class MockClientCallbacks : public ClientCallbacks {
~MockClientCallbacks() override;

void onResponse(Common::Redis::RespValuePtr&& value) override { onResponse_(value); }
bool onRedirection(Common::Redis::RespValuePtr&& value) override { return onRedirection_(value); }
bool onRedirection(Common::Redis::RespValuePtr&& value, const std::string& host_address,
bool ask_redirection) override {
return onRedirection_(value, host_address, ask_redirection);
}

MOCK_METHOD1(onResponse_, void(Common::Redis::RespValuePtr& value));
MOCK_METHOD0(onFailure, void());
MOCK_METHOD1(onRedirection_, bool(Common::Redis::RespValuePtr& value));
MOCK_METHOD3(onRedirection_, bool(Common::Redis::RespValuePtr& value,
const std::string& host_address, bool ask_redirection));
};

} // namespace Client
Expand Down
Loading