Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](Recycler) Add rate limit for all obj storage client implementation #37663

Merged
merged 9 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,13 @@ DEFINE_mInt32(s3_read_base_wait_time_ms, "100");
DEFINE_mInt32(s3_read_max_wait_time_ms, "800");

DEFINE_mBool(enable_s3_rate_limiter, "false");
DEFINE_mInt64(s3_get_bucket_tokens, "1000000000000000000");
DEFINE_mInt64(s3_get_token_per_second, "1000000000000000000");
DEFINE_mInt64(s3_get_token_limit, "0");

DEFINE_mInt64(s3_put_bucket_tokens, "1000000000000000000");
DEFINE_mInt64(s3_put_token_per_second, "1000000000000000000");
DEFINE_mInt64(s3_put_token_limit, "0");

DEFINE_String(trino_connector_plugin_dir, "${DORIS_HOME}/connectors");

Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,13 @@ DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_mBool(check_segment_when_build_rowset_meta);

DECLARE_mBool(enable_s3_rate_limiter);
DECLARE_mInt64(s3_get_bucket_tokens);
DECLARE_mInt64(s3_get_token_per_second);
DECLARE_mInt64(s3_get_token_limit);

DECLARE_mInt64(s3_put_bucket_tokens);
DECLARE_mInt64(s3_put_token_per_second);
DECLARE_mInt64(s3_put_token_limit);
// max s3 client retry times
DECLARE_mInt32(max_s3_client_retry);
// When meet s3 429 error, the "get" request will
Expand Down
129 changes: 0 additions & 129 deletions be/src/util/s3_rate_limiter.cpp

This file was deleted.

79 changes: 0 additions & 79 deletions be/src/util/s3_rate_limiter.h

This file was deleted.

19 changes: 19 additions & 0 deletions be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,22 @@ constexpr char S3_REQUEST_TIMEOUT_MS[] = "AWS_REQUEST_TIMEOUT_MS";
constexpr char S3_CONN_TIMEOUT_MS[] = "AWS_CONNECTION_TIMEOUT_MS";

} // namespace

bvar::Adder<int64_t> get_rate_limit_ms("get_rate_limit_ms");
bvar::Adder<int64_t> put_rate_limit_ms("put_rate_limit_ms");

S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) {
CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << to_string(type);
return _rate_limiters[static_cast<size_t>(type)].get();
}

int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t max_burst, size_t limit) {
if (type == S3RateLimitType::UNKNOWN) {
return -1;
}
return S3ClientFactory::instance().rate_limiter(type)->reset(max_speed, max_burst, limit);
}

class DorisAWSLogger final : public Aws::Utils::Logging::LogSystemInterface {
public:
DorisAWSLogger() : _log_level(Aws::Utils::Logging::LogLevel::Info) {}
Expand Down Expand Up @@ -149,6 +160,14 @@ S3ClientFactory::S3ClientFactory() {
};
Aws::InitAPI(_aws_options);
_ca_cert_file_path = get_valid_ca_cert_path();
_rate_limiters = {std::make_unique<S3RateLimiterHolder>(
S3RateLimitType::GET, config::s3_get_token_per_second,
config::s3_get_bucket_tokens, config::s3_get_token_limit,
[&](int64_t ms) { get_rate_limit_ms << ms; }),
std::make_unique<S3RateLimiterHolder>(
S3RateLimitType::PUT, config::s3_put_token_per_second,
config::s3_put_bucket_tokens, config::s3_put_token_limit,
[&](int64_t ms) { put_rate_limit_ms << ms; })};
}

string S3ClientFactory::get_valid_ca_cert_path() {
Expand Down
3 changes: 1 addition & 2 deletions be/src/util/s3_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <fmt/format.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/cloud.pb.h>
#include <stdint.h>

#include <map>
#include <memory>
Expand All @@ -33,8 +32,8 @@
#include <unordered_map>

#include "common/status.h"
#include "cpp/s3_rate_limiter.h"
#include "io/fs/obj_storage_client.h"
#include "util/s3_rate_limiter.h"
#include "vec/common/string_ref.h"

namespace Aws::S3 {
Expand Down
1 change: 0 additions & 1 deletion cloud/src/rate-limiter/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@ set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lfdb_c -L${THIRDPARTY_DIR

add_library(RateLimiter
rate_limiter.cpp
s3_rate_limiter.cpp
)
44 changes: 38 additions & 6 deletions cloud/src/recycler/azure_obj_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,39 @@
#include <iterator>
#include <ranges>

#include "common/config.h"
#include "common/logging.h"
#include "cpp/s3_rate_limiter.h"
#include "cpp/sync_point.h"
#include "recycler/s3_accessor.h"

using namespace Azure::Storage::Blobs;

namespace doris::cloud {

template <typename Func>
auto s3_rate_limit(S3RateLimitType op, Func callback) -> decltype(callback()) {
using T = decltype(callback());
if (!config::enable_s3_rate_limiter) {
return callback();
}
auto sleep_duration = AccessorRateLimiter::instance().rate_limiter(op)->add(1);
if (sleep_duration < 0) {
throw std::runtime_error("Azure exceeds request limit");
}
return callback();
}

template <typename Func>
auto s3_get_rate_limit(Func callback) -> decltype(callback()) {
return s3_rate_limit(S3RateLimitType::GET, std::move(callback));
}

template <typename Func>
auto s3_put_rate_limit(Func callback) -> decltype(callback()) {
return s3_rate_limit(S3RateLimitType::PUT, std::move(callback));
}

static constexpr size_t BlobBatchMaxOperations = 256;
static constexpr char BlobNotFound[] = "BlobNotFound";

Expand Down Expand Up @@ -86,7 +112,7 @@ class AzureListIterator final : public ObjectListIterator {
}

try {
auto resp = client_->ListBlobs(req_);
auto resp = s3_get_rate_limit([&]() { return client_->ListBlobs(req_); });
has_more_ = resp.NextPageToken.HasValue();
DCHECK(!(has_more_ && resp.Blobs.empty())) << has_more_ << ' ' << resp.Blobs.empty();
req_.ContinuationToken = std::move(resp.NextPageToken);
Expand Down Expand Up @@ -137,14 +163,18 @@ ObjectStorageResponse AzureObjClient::put_object(ObjectStoragePathRef path,
auto client = client_->GetBlockBlobClient(path.key);
return do_azure_client_call(
[&]() {
client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()), stream.size());
s3_put_rate_limit([&]() {
return client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()),
stream.size());
});
},
client_->GetUrl(), path.key);
}

ObjectStorageResponse AzureObjClient::head_object(ObjectStoragePathRef path, ObjectMeta* res) {
try {
auto&& properties = client_->GetBlockBlobClient(path.key).GetProperties().Value;
auto&& properties = s3_get_rate_limit(
[&]() { return client_->GetBlockBlobClient(path.key).GetProperties().Value; });
res->key = path.key;
res->mtime_s = properties.LastModified.time_since_epoch().count();
res->size = properties.BlobSize;
Expand Down Expand Up @@ -193,8 +223,9 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket,
for (auto it = begin; it != chunk_end; ++it) {
deferred_resps.emplace_back(batch.DeleteBlob(*it));
}
auto resp = do_azure_client_call([&]() { client_->SubmitBatch(batch); }, client_->GetUrl(),
*begin);
auto resp = do_azure_client_call(
[&]() { s3_put_rate_limit([&]() { return client_->SubmitBatch(batch); }); },
client_->GetUrl(), *begin);
if (resp.ret != 0) {
return resp;
}
Expand Down Expand Up @@ -227,7 +258,8 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket,
ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) {
return do_azure_client_call(
[&]() {
if (auto r = client_->DeleteBlob(path.key); !r.Value.Deleted) {
if (auto r = s3_put_rate_limit([&]() { return client_->DeleteBlob(path.key); });
!r.Value.Deleted) {
throw std::runtime_error("Delete azure blob failed");
}
},
Expand Down
Loading
Loading