Skip to content

Commit

Permalink
[feature](Recycler) Add rate limit for all obj storage client impleme…
Browse files Browse the repository at this point in the history
…ntation (#37663)

We should enable rate limit for all kinds of network IO of recycler.
This pr adds the rate limiter for obj storage client.
  • Loading branch information
ByteYue authored and dataroaring committed Jul 17, 2024
1 parent e16ec02 commit 48c537c
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 308 deletions.
7 changes: 7 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,13 @@ DEFINE_mInt32(s3_read_base_wait_time_ms, "100");
DEFINE_mInt32(s3_read_max_wait_time_ms, "800");

DEFINE_mBool(enable_s3_rate_limiter, "false");
DEFINE_mInt64(s3_get_bucket_tokens, "1000000000000000000");
DEFINE_mInt64(s3_get_token_per_second, "1000000000000000000");
DEFINE_mInt64(s3_get_token_limit, "0");

DEFINE_mInt64(s3_put_bucket_tokens, "1000000000000000000");
DEFINE_mInt64(s3_put_token_per_second, "1000000000000000000");
DEFINE_mInt64(s3_put_token_limit, "0");

DEFINE_String(trino_connector_plugin_dir, "${DORIS_HOME}/connectors");

Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,13 @@ DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_mBool(check_segment_when_build_rowset_meta);

DECLARE_mBool(enable_s3_rate_limiter);
DECLARE_mInt64(s3_get_bucket_tokens);
DECLARE_mInt64(s3_get_token_per_second);
DECLARE_mInt64(s3_get_token_limit);

DECLARE_mInt64(s3_put_bucket_tokens);
DECLARE_mInt64(s3_put_token_per_second);
DECLARE_mInt64(s3_put_token_limit);
// max s3 client retry times
DECLARE_mInt32(max_s3_client_retry);
// When meet s3 429 error, the "get" request will
Expand Down
129 changes: 0 additions & 129 deletions be/src/util/s3_rate_limiter.cpp

This file was deleted.

79 changes: 0 additions & 79 deletions be/src/util/s3_rate_limiter.h

This file was deleted.

19 changes: 19 additions & 0 deletions be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,22 @@ constexpr char S3_REQUEST_TIMEOUT_MS[] = "AWS_REQUEST_TIMEOUT_MS";
constexpr char S3_CONN_TIMEOUT_MS[] = "AWS_CONNECTION_TIMEOUT_MS";

} // namespace

bvar::Adder<int64_t> get_rate_limit_ms("get_rate_limit_ms");
bvar::Adder<int64_t> put_rate_limit_ms("put_rate_limit_ms");

S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) {
CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << to_string(type);
return _rate_limiters[static_cast<size_t>(type)].get();
}

int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t max_burst, size_t limit) {
if (type == S3RateLimitType::UNKNOWN) {
return -1;
}
return S3ClientFactory::instance().rate_limiter(type)->reset(max_speed, max_burst, limit);
}

class DorisAWSLogger final : public Aws::Utils::Logging::LogSystemInterface {
public:
DorisAWSLogger() : _log_level(Aws::Utils::Logging::LogLevel::Info) {}
Expand Down Expand Up @@ -149,6 +160,14 @@ S3ClientFactory::S3ClientFactory() {
};
Aws::InitAPI(_aws_options);
_ca_cert_file_path = get_valid_ca_cert_path();
_rate_limiters = {std::make_unique<S3RateLimiterHolder>(
S3RateLimitType::GET, config::s3_get_token_per_second,
config::s3_get_bucket_tokens, config::s3_get_token_limit,
[&](int64_t ms) { get_rate_limit_ms << ms; }),
std::make_unique<S3RateLimiterHolder>(
S3RateLimitType::PUT, config::s3_put_token_per_second,
config::s3_put_bucket_tokens, config::s3_put_token_limit,
[&](int64_t ms) { put_rate_limit_ms << ms; })};
}

string S3ClientFactory::get_valid_ca_cert_path() {
Expand Down
3 changes: 1 addition & 2 deletions be/src/util/s3_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <fmt/format.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/cloud.pb.h>
#include <stdint.h>

#include <map>
#include <memory>
Expand All @@ -33,8 +32,8 @@
#include <unordered_map>

#include "common/status.h"
#include "cpp/s3_rate_limiter.h"
#include "io/fs/obj_storage_client.h"
#include "util/s3_rate_limiter.h"
#include "vec/common/string_ref.h"

namespace Aws::S3 {
Expand Down
1 change: 0 additions & 1 deletion cloud/src/rate-limiter/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@ set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lfdb_c -L${THIRDPARTY_DIR

add_library(RateLimiter
rate_limiter.cpp
s3_rate_limiter.cpp
)
44 changes: 38 additions & 6 deletions cloud/src/recycler/azure_obj_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,39 @@
#include <iterator>
#include <ranges>

#include "common/config.h"
#include "common/logging.h"
#include "cpp/s3_rate_limiter.h"
#include "cpp/sync_point.h"
#include "recycler/s3_accessor.h"

using namespace Azure::Storage::Blobs;

namespace doris::cloud {

template <typename Func>
auto s3_rate_limit(S3RateLimitType op, Func callback) -> decltype(callback()) {
using T = decltype(callback());
if (!config::enable_s3_rate_limiter) {
return callback();
}
auto sleep_duration = AccessorRateLimiter::instance().rate_limiter(op)->add(1);
if (sleep_duration < 0) {
throw std::runtime_error("Azure exceeds request limit");
}
return callback();
}

template <typename Func>
auto s3_get_rate_limit(Func callback) -> decltype(callback()) {
return s3_rate_limit(S3RateLimitType::GET, std::move(callback));
}

template <typename Func>
auto s3_put_rate_limit(Func callback) -> decltype(callback()) {
return s3_rate_limit(S3RateLimitType::PUT, std::move(callback));
}

static constexpr size_t BlobBatchMaxOperations = 256;
static constexpr char BlobNotFound[] = "BlobNotFound";

Expand Down Expand Up @@ -89,7 +115,7 @@ class AzureListIterator final : public ObjectListIterator {
}

try {
auto resp = client_->ListBlobs(req_);
auto resp = s3_get_rate_limit([&]() { return client_->ListBlobs(req_); });
has_more_ = resp.NextPageToken.HasValue();
DCHECK(!(has_more_ && resp.Blobs.empty())) << has_more_ << ' ' << resp.Blobs.empty();
req_.ContinuationToken = std::move(resp.NextPageToken);
Expand Down Expand Up @@ -145,14 +171,18 @@ ObjectStorageResponse AzureObjClient::put_object(ObjectStoragePathRef path,
auto client = client_->GetBlockBlobClient(path.key);
return do_azure_client_call(
[&]() {
client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()), stream.size());
s3_put_rate_limit([&]() {
return client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()),
stream.size());
});
},
client_->GetUrl(), path.key);
}

ObjectStorageResponse AzureObjClient::head_object(ObjectStoragePathRef path, ObjectMeta* res) {
try {
auto&& properties = client_->GetBlockBlobClient(path.key).GetProperties().Value;
auto&& properties = s3_get_rate_limit(
[&]() { return client_->GetBlockBlobClient(path.key).GetProperties().Value; });
res->key = path.key;
res->mtime_s = properties.LastModified.time_since_epoch().count();
res->size = properties.BlobSize;
Expand Down Expand Up @@ -201,8 +231,9 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket,
for (auto it = begin; it != chunk_end; ++it) {
deferred_resps.emplace_back(batch.DeleteBlob(*it));
}
auto resp = do_azure_client_call([&]() { client_->SubmitBatch(batch); }, client_->GetUrl(),
*begin);
auto resp = do_azure_client_call(
[&]() { s3_put_rate_limit([&]() { return client_->SubmitBatch(batch); }); },
client_->GetUrl(), *begin);
if (resp.ret != 0) {
return resp;
}
Expand Down Expand Up @@ -235,7 +266,8 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket,
ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) {
return do_azure_client_call(
[&]() {
if (auto r = client_->DeleteBlob(path.key); !r.Value.Deleted) {
if (auto r = s3_put_rate_limit([&]() { return client_->DeleteBlob(path.key); });
!r.Value.Deleted) {
throw std::runtime_error("Delete azure blob failed");
}
},
Expand Down
Loading

0 comments on commit 48c537c

Please sign in to comment.