Skip to content

Commit

Permalink
[Improvement](ObjectStorage) Retry when encountering TooManyRequest r…
Browse files Browse the repository at this point in the history
…esponse (#37199)

We can do retry for 429 using the embedded utility of corresponding sdk.
  • Loading branch information
ByteYue authored and dataroaring committed Jul 19, 2024
1 parent df60086 commit e3c2c94
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 21 deletions.
29 changes: 8 additions & 21 deletions cloud/src/recycler/s3_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,13 @@
#include "common/logging.h"
#include "common/string_util.h"
#include "common/util.h"
#include "cpp/obj_retry_strategy.h"
#include "cpp/s3_rate_limiter.h"
#include "recycler/azure_obj_client.h"
#include "recycler/obj_storage_client.h"
#include "recycler/s3_obj_client.h"
#include "recycler/storage_vault_accessor.h"

namespace {

bvar::Adder<uint64_t> too_many_request_http_retry_times("too_many_request_http_retry_times");

class CustomRetryStrategy final : public Aws::Client::DefaultRetryStrategy {
public:
CustomRetryStrategy(int maxRetries) : DefaultRetryStrategy(maxRetries) {}

bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
long attemptedRetries) const override {
if (attemptedRetries < m_maxRetries &&
error.GetResponseCode() == Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS) {
too_many_request_http_retry_times << 1;
return true;
}
return Aws::Client::DefaultRetryStrategy::ShouldRetry(error, attemptedRetries);
}
};
} // namespace

namespace doris::cloud {
bvar::Adder<int64_t> get_rate_limit_ms("get_rate_limit_ms");
bvar::Adder<int64_t> put_rate_limit_ms("put_rate_limit_ms");
Expand Down Expand Up @@ -229,6 +210,12 @@ int S3Accessor::init() {
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(conf_.ak, conf_.sk);
uri_ = fmt::format("{}://{}.blob.core.windows.net/{}", config::s3_client_http_scheme,
conf_.ak, conf_.bucket);
// In Azure's HTTP requests, all policies in the vector are called in a chained manner following the HTTP pipeline approach.
// Within the RetryPolicy, the nextPolicy is called multiple times inside a loop.
// All policies in the PerRetryPolicies are downstream of the RetryPolicy.
// Therefore, you only need to add a policy to check if the response code is 429 and if the retry count meets the condition, it can record the retry count.
options.PerRetryPolicies.emplace_back(
std::make_unique<AzureRetryRecordPolicy>(config::max_s3_client_retry));
auto container_client = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(
uri_, cred, std::move(options));
// uri format for debug: ${scheme}://${ak}.blob.core.windows.net/${bucket}/${prefix}
Expand All @@ -249,7 +236,7 @@ int S3Accessor::init() {
if (config::s3_client_http_scheme == "http") {
aws_config.scheme = Aws::Http::Scheme::HTTP;
}
aws_config.retryStrategy = std::make_shared<CustomRetryStrategy>(
aws_config.retryStrategy = std::make_shared<S3CustomRetryStrategy>(
config::max_s3_client_retry /*scaleFactor = 25*/);
auto s3_client = std::make_shared<Aws::S3::S3Client>(
std::move(aws_cred), std::move(aws_config),
Expand Down
61 changes: 61 additions & 0 deletions common/cpp/obj_retry_strategy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "obj_retry_strategy.h"

#include <bvar/reducer.h>

namespace doris {

bvar::Adder<int64_t> s3_too_many_request_retry_cnt("s3_too_many_request_retry_cnt");

S3CustomRetryStrategy::S3CustomRetryStrategy(int maxRetries) : DefaultRetryStrategy(maxRetries) {}

S3CustomRetryStrategy::~S3CustomRetryStrategy() = default;

bool S3CustomRetryStrategy::ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
long attemptedRetries) const {
if (attemptedRetries < m_maxRetries &&
error.GetResponseCode() == Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS) {
s3_too_many_request_retry_cnt << 1;
return true;
}
return Aws::Client::DefaultRetryStrategy::ShouldRetry(error, attemptedRetries);
}

AzureRetryRecordPolicy::AzureRetryRecordPolicy(int retry_cnt) : retry_cnt(retry_cnt) {}

AzureRetryRecordPolicy::~AzureRetryRecordPolicy() = default;

std::unique_ptr<Azure::Core::Http::RawResponse> AzureRetryRecordPolicy::Send(
Azure::Core::Http::Request& request, Azure::Core::Http::Policies::NextHttpPolicy nextPolicy,
Azure::Core::Context const& context) const {
auto resp = nextPolicy.Send(request, context);
if (retry_cnt != 0 &&
resp->GetStatusCode() == Azure::Core::Http::HttpStatusCode::TooManyRequests) {
retry_cnt--;
s3_too_many_request_retry_cnt << 1;
}
return resp;
}

std::unique_ptr<AzureRetryRecordPolicy::HttpPolicy> AzureRetryRecordPolicy::Clone() const {
auto ret = std::make_unique<AzureRetryRecordPolicy>(*this);
ret->retry_cnt = 0;
return ret;
}
} // namespace doris
48 changes: 48 additions & 0 deletions common/cpp/obj_retry_strategy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <aws/core/client/AWSError.h>
#include <aws/core/client/DefaultRetryStrategy.h>

#include <azure/core/http/policies/policy.hpp>

namespace doris {
class S3CustomRetryStrategy final : public Aws::Client::DefaultRetryStrategy {
public:
S3CustomRetryStrategy(int maxRetries);
~S3CustomRetryStrategy() override;

bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
long attemptedRetries) const override;
};

class AzureRetryRecordPolicy final : public Azure::Core::Http::Policies::HttpPolicy {
public:
AzureRetryRecordPolicy(int retry_cnt);
~AzureRetryRecordPolicy() override;
std::unique_ptr<HttpPolicy> Clone() const override;
std::unique_ptr<Azure::Core::Http::RawResponse> Send(
Azure::Core::Http::Request& request,
Azure::Core::Http::Policies::NextHttpPolicy nextPolicy,
Azure::Core::Context const& context) const override;

private:
mutable int retry_cnt;
};
} // namespace doris

0 comments on commit e3c2c94

Please sign in to comment.