-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
s3_util.h
174 lines (147 loc) · 6.17 KB
/
s3_util.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/s3/S3Errors.h>
#include <bvar/bvar.h>
#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>
#include <stdint.h>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include "common/status.h"
#include "util/s3_rate_limiter.h"
#include "vec/common/string_ref.h"
namespace Aws {
namespace S3 {
class S3Client;
} // namespace S3
} // namespace Aws
namespace bvar {
template <typename T>
class Adder;
}
namespace doris {
namespace s3_bvar {
extern bvar::LatencyRecorder s3_get_latency;
extern bvar::LatencyRecorder s3_put_latency;
extern bvar::LatencyRecorder s3_delete_latency;
extern bvar::LatencyRecorder s3_head_latency;
extern bvar::LatencyRecorder s3_multi_part_upload_latency;
extern bvar::LatencyRecorder s3_list_latency;
extern bvar::LatencyRecorder s3_list_object_versions_latency;
extern bvar::LatencyRecorder s3_get_bucket_version_latency;
extern bvar::LatencyRecorder s3_copy_object_latency;
}; // namespace s3_bvar
class S3URI;
inline ::Aws::Client::AWSError<::Aws::S3::S3Errors> s3_error_factory() {
return {::Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds limit", false};
}
#define DO_S3_RATE_LIMIT(op, code) \
[&]() mutable { \
if (!config::enable_s3_rate_limiter) { \
return (code); \
} \
auto sleep_duration = S3ClientFactory::instance().rate_limiter(op)->add(1); \
if (sleep_duration < 0) { \
using T = decltype((code)); \
return T(s3_error_factory()); \
} \
return (code); \
}()
#define DO_S3_GET_RATE_LIMIT(code) DO_S3_RATE_LIMIT(S3RateLimitType::GET, code)
const static std::string S3_AK = "AWS_ACCESS_KEY";
const static std::string S3_SK = "AWS_SECRET_KEY";
const static std::string S3_ENDPOINT = "AWS_ENDPOINT";
const static std::string S3_REGION = "AWS_REGION";
const static std::string S3_TOKEN = "AWS_TOKEN";
const static std::string S3_MAX_CONN_SIZE = "AWS_MAX_CONN_SIZE";
const static std::string S3_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
const static std::string S3_CONN_TIMEOUT_MS = "AWS_CONNECTION_TIMEOUT_MS";
struct S3ClientConf {
std::string endpoint;
std::string region;
std::string ak;
std::string sk;
std::string token;
int max_connections = -1;
int request_timeout_ms = -1;
int connect_timeout_ms = -1;
bool use_virtual_addressing = true;
uint64_t get_hash() const {
uint64_t hash_code = 0;
hash_code ^= crc32_hash(ak);
hash_code ^= crc32_hash(sk);
hash_code ^= crc32_hash(token);
hash_code ^= crc32_hash(endpoint);
hash_code ^= crc32_hash(region);
hash_code ^= max_connections;
hash_code ^= request_timeout_ms;
hash_code ^= connect_timeout_ms;
hash_code ^= use_virtual_addressing;
return hash_code;
}
std::string to_string() const {
return fmt::format(
"(ak={}, token={}, endpoint={}, region={}, max_connections={}, "
"request_timeout_ms={}, connect_timeout_ms={}, use_virtual_addressing={}",
ak, token, endpoint, region, max_connections, request_timeout_ms,
connect_timeout_ms, use_virtual_addressing);
}
};
struct S3Conf {
std::string bucket;
std::string prefix;
S3ClientConf client_conf;
bool sse_enabled = false;
cloud::ObjectStoreInfoPB::Provider provider;
std::string to_string() const {
return fmt::format("(bucket={}, prefix={}, client_conf={}, sse_enabled={})", bucket, prefix,
client_conf.to_string(), sse_enabled);
}
};
class S3ClientFactory {
public:
~S3ClientFactory();
static S3ClientFactory& instance();
std::shared_ptr<Aws::S3::S3Client> create(const S3ClientConf& s3_conf);
static Status convert_properties_to_s3_conf(const std::map<std::string, std::string>& prop,
const S3URI& s3_uri, S3Conf* s3_conf);
static Aws::Client::ClientConfiguration& getClientConfiguration() {
// The default constructor of ClientConfiguration will do some http call
// such as Aws::Internal::GetEC2MetadataClient and other init operation,
// which is unnecessary.
// So here we use a static instance, and deep copy every time
// to avoid unnecessary operations.
static Aws::Client::ClientConfiguration instance;
return instance;
}
S3RateLimiterHolder* rate_limiter(S3RateLimitType type);
private:
S3ClientFactory();
static std::string get_valid_ca_cert_path();
Aws::SDKOptions _aws_options;
std::mutex _lock;
std::unordered_map<uint64_t, std::shared_ptr<Aws::S3::S3Client>> _cache;
std::string _ca_cert_file_path;
std::array<std::unique_ptr<S3RateLimiterHolder>, 2> _rate_limiters;
};
} // end namespace doris