Skip to content

Commit

Permalink
curvefs/client: perf optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhongsong committed Apr 13, 2022
1 parent 79e09d3 commit 7f41136
Show file tree
Hide file tree
Showing 29 changed files with 454 additions and 210 deletions.
30 changes: 15 additions & 15 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bazel_skylib_workspace()

git_repository(
name = "com_github_baidu_braft",
remote = "https://github.com/baidu/braft",
remote = "https://gitee.com/baidu/braft",
commit = "e255c0e4b18d1a8a5d484d4b647f41ff1385ef1e",
)

Expand All @@ -53,7 +53,7 @@ http_archive(
build_file = "@com_google_protobuf//:third_party/zlib.BUILD",
sha256 = "c3e5e9fdd5004dcb542feda5ee4f0ff0744628baf8ed2dd5d66f8ca1197cb1a1",
strip_prefix = "zlib-1.2.11",
urls = ["https://zlib.net/zlib-1.2.11.tar.gz"],
urls = ["https://curve-build.nos-eastchina1.126.net/zlib-1.2.11.tar.gz"],
)

bind(
Expand All @@ -67,7 +67,7 @@ http_archive(
patch_args = ["-p1"],
patches = ["//:thirdparties/protobuf/protobuf.patch"],
sha256 = "9510dd2afc29e7245e9e884336f848c8a6600a14ae726adb6befdb4f786f0be2",
urls = ["https://github.com/google/protobuf/archive/v3.6.1.3.zip"],
urls = ["https://curve-build.nos-eastchina1.126.net/protobuf-3.6.1.3.zip"],
)

bind(
Expand All @@ -79,7 +79,7 @@ bind(
new_git_repository(
name = "com_google_googletest",
build_file = "bazel/gmock.BUILD",
remote = "https://github.com/google/googletest",
remote = "https://gitee.com/mirrors/googletest",
tag = "release-1.8.0",
)

Expand All @@ -92,7 +92,7 @@ bind(
# brpc内BUILD文件在依赖glog时, 直接指定的依赖是"@com_github_google_glog//:glog"
git_repository(
name = "com_github_google_glog",
remote = "https://github.com/google/glog",
remote = "https://gitee.com/mirrors/glog",
commit = "4cc89c9e2b452db579397887c37f302fb28f6ca1",
patch_args = ["-p1"],
patches = ["//:thirdparties/glog/glog.patch"],
Expand All @@ -107,7 +107,7 @@ bind(
http_archive(
name = "com_github_gflags_gflags",
strip_prefix = "gflags-2.2.2",
urls = ["https://github.com/gflags/gflags/archive/v2.2.2.tar.gz"],
urls = ["https://curve-build.nos-eastchina1.126.net/gflags-2.2.2.tar.gz"],
)

bind(
Expand All @@ -119,7 +119,7 @@ http_archive(
name = "com_github_google_leveldb",
build_file = "bazel/leveldb.BUILD",
strip_prefix = "leveldb-a53934a3ae1244679f812d998a4f16f2c7f309a6",
urls = ["https://github.com/google/leveldb/archive/a53934a3ae1244679f812d998a4f16f2c7f309a6.tar.gz"],
urls = ["https://curve-build.nos-eastchina1.126.net/leveldb-a53934a3ae1244679f812d998a4f16f2c7f309a6.tar.gz"],
)

bind(
Expand All @@ -129,7 +129,7 @@ bind(

git_repository(
name = "com_github_apache_brpc",
remote = "https://github.com/apache/incubator-brpc",
remote = "https://gitee.com/baidu/BRPC",
commit = "1b9e00641cbec1c8803da6a1f7f555398c954cb0",
patches = ["//:thirdparties/brpc/brpc.patch"],
patch_args = ["-p1"],
Expand Down Expand Up @@ -159,7 +159,7 @@ bind(
new_git_repository(
name = "jsoncpp",
build_file = "bazel/jsoncpp.BUILD",
remote = "https://github.com/open-source-parsers/jsoncpp.git",
remote = "https://gitee.com/mirrors/jsoncpp",
tag = "1.8.4",
)

Expand All @@ -176,31 +176,31 @@ new_local_repository(

http_archive(
name = "aws",
urls = ["https://github.com/aws/aws-sdk-cpp/archive/1.7.340.tar.gz"],
urls = ["https://curve-build.nos-eastchina1.126.net/aws-sdk-cpp-1.7.340.tar.gz"],
sha256 = "2e82517045efb55409cff1408c12829d9e8aea22c1e2888529cb769b7473b0bf",
strip_prefix = "aws-sdk-cpp-1.7.340",
build_file = "//:thirdparties/aws/aws.BUILD",
)

http_archive(
name = "aws_c_common",
urls = ["https://github.com/awslabs/aws-c-common/archive/v0.4.29.tar.gz"],
urls = ["https://curve-build.nos-eastchina1.126.net/aws-c-common-0.4.29.tar.gz"],
sha256 = "01c2a58553a37b3aa5914d9e0bf7bf14507ff4937bc5872a678892ca20fcae1f",
strip_prefix = "aws-c-common-0.4.29",
build_file = "//:thirdparties/aws/aws-c-common.BUILD",
)

http_archive(
name = "aws_c_event_stream",
urls = ["https://github.com/awslabs/aws-c-event-stream/archive/v0.1.4.tar.gz"],
urls = ["https://curve-build.nos-eastchina1.126.net/aws-c-event-stream-0.1.4.tar.gz"],
sha256 = "31d880d1c868d3f3df1e1f4b45e56ac73724a4dc3449d04d47fc0746f6f077b6",
strip_prefix = "aws-c-event-stream-0.1.4",
build_file = "//:thirdparties/aws/aws-c-event-stream.BUILD",
)

http_archive(
name = "aws_checksums",
urls = ["https://github.com/awslabs/aws-checksums/archive/v0.1.5.tar.gz"],
urls = ["https://curve-build.nos-eastchina1.126.net/aws-checksums-0.1.5.tar.gz"],
sha256 = "6e6bed6f75cf54006b6bafb01b3b96df19605572131a2260fddaf0e87949ced0",
strip_prefix = "aws-checksums-0.1.5",
build_file = "//:thirdparties/aws/aws-checksums.BUILD",
Expand All @@ -217,7 +217,7 @@ http_archive(
# abseil-cpp
http_archive(
name = "com_google_absl",
urls = ["https://github.com/abseil/abseil-cpp/archive/refs/tags/20210324.2.tar.gz"],
urls = ["https://curve-build.nos-eastchina1.126.net/abseil-cpp-20210324.2.tar.gz"],
strip_prefix = "abseil-cpp-20210324.2",
sha256 = "59b862f50e710277f8ede96f083a5bb8d7c9595376146838b9580be90374ee1f",
)
Expand All @@ -227,7 +227,7 @@ http_archive(
name = "platforms",
sha256 = "b601beaf841244de5c5a50d2b2eddd34839788000fa1be4260ce6603ca0d8eb7",
strip_prefix = "platforms-98939346da932eef0b54cf808622f5bb0928f00b",
urls = ["https://github.com/bazelbuild/platforms/archive/98939346da932eef0b54cf808622f5bb0928f00b.zip"],
urls = ["https://curve-build.nos-eastchina1.126.net/platforms-98939346da932eef0b54cf808622f5bb0928f00b.zip"],
)

# RocksDB
Expand Down
2 changes: 2 additions & 0 deletions curvefs/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ s3.logPrefix=/data/logs/curvefs/aws_ # __CURVEADM_TEMPLATE__ /curvefs/client/log
s3.async_thread_num=30
# limit all inflight async requests' bytes, |0| means not limited
s3.max_async_request_inflight_bytes=104857600
s3.chunkFlushThreads=5
# throttle
s3.throttle.iopsTotalLimit=0
s3.throttle.iopsReadLimit=0
Expand All @@ -161,6 +162,7 @@ diskCache.asyncLoadPeriodMs=5
# util less than safeRatio
diskCache.fullRatio=90
diskCache.safeRatio=70
diskCache.threads=5
# the max size disk cache can use
diskCache.maxUsableSpaceBytes=107374182400
# the max time system command can run
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ cc_binary(
],
deps = [
":fuse_client_lib",
"@com_google_absl//absl/memory",
],
)

Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ void InitDiskCacheOption(Configuration *conf,
&diskCacheOption->maxUsableSpaceBytes);
conf->GetValueFatalIfFail("diskCache.cmdTimeoutSec",
&diskCacheOption->cmdTimeoutSec);
conf->GetValueFatalIfFail("diskCache.threads",
&diskCacheOption->threads);
conf->GetValueFatalIfFail("diskCache.avgFlushBytes",
&diskCacheOption->avgFlushBytes);
conf->GetValueFatalIfFail("diskCache.burstFlushBytes",
Expand Down Expand Up @@ -145,6 +147,8 @@ void InitS3Option(Configuration *conf, S3Option *s3Opt) {
&s3Opt->s3ClientAdaptorOpt.intervalSec);
conf->GetValueFatalIfFail("s3.flushIntervalSec",
&s3Opt->s3ClientAdaptorOpt.flushIntervalSec);
conf->GetValueFatalIfFail("s3.chunkFlushThreads",
&s3Opt->s3ClientAdaptorOpt.chunkFlushThreads);
conf->GetValueFatalIfFail("s3.writeCacheMaxByte",
&s3Opt->s3ClientAdaptorOpt.writeCacheMaxByte);
conf->GetValueFatalIfFail("s3.readCacheMaxByte",
Expand Down
3 changes: 3 additions & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ struct DiskCacheOption {
uint64_t maxUsableSpaceBytes;
// the max time system command can run
uint32_t cmdTimeoutSec;
// threads for disk cache
uint32_t threads;
// the write throttle bps of disk cache
uint64_t avgFlushBytes;
// the write burst bps of disk cache
Expand All @@ -119,6 +121,7 @@ struct S3ClientAdaptorOption {
uint32_t prefetchBlocks;
uint32_t prefetchExecQueueNum;
uint32_t intervalSec;
uint32_t chunkFlushThreads;
uint32_t flushIntervalSec;
uint64_t writeCacheMaxByte;
uint64_t readCacheMaxByte;
Expand Down
36 changes: 32 additions & 4 deletions curvefs/src/client/s3/client_s3_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
* Author: huyao
*/



#include <brpc/channel.h>
#include <brpc/controller.h>
#include <algorithm>
#include <list>

#include "absl/memory/memory.h"
#include "curvefs/src/client/s3/client_s3_adaptor.h"
#include "curvefs/src/common/s3util.h"

Expand Down Expand Up @@ -57,6 +57,7 @@ S3ClientAdaptorImpl::Init(
memCacheNearfullRatio_ = option.nearfullRatio;
throttleBaseSleepUs_ = option.baseSleepUs;
flushIntervalSec_ = option.flushIntervalSec;
chunkFlushThreads_ = option.chunkFlushThreads;
client_ = client;
inodeManager_ = inodeManager;
mdsClient_ = mdsClient;
Expand Down Expand Up @@ -95,6 +96,8 @@ S3ClientAdaptorImpl::Init(
<< ", readCacheMaxByte: " << option.readCacheMaxByte
<< ", nearfullRatio: " << option.nearfullRatio
<< ", baseSleepUs: " << option.baseSleepUs;
// start chunk flush threads
taskPool_.Start(chunkFlushThreads_);
return CURVEFS_ERROR::OK;
}

Expand All @@ -116,12 +119,16 @@ int S3ClientAdaptorImpl::Write(uint64_t inodeId, uint64_t offset,
if ((size + pendingReq * fuseMaxSize_) >= maxSize) {
LOG(INFO) << "write cache is full, wait flush. size:" << size
<< ", maxSize:" << maxSize;
// offer to do flush
waitInterval_.StopWait();
fsCacheManager_->WaitFlush();
}
}
uint64_t memCacheRatio = fsCacheManager_->MemCacheRatio();
int64_t exceedRatio = memCacheRatio - memCacheNearfullRatio_;
if (exceedRatio > 0) {
// offer to do flush
waitInterval_.StopWait();
// upload to s3 derectly or cache disk full
bool needSleep =
(DisableDiskCache() || IsReadCache()) ||
Expand Down Expand Up @@ -160,7 +167,8 @@ int S3ClientAdaptorImpl::Read(uint64_t inodeId, uint64_t offset,
if (s3Metric_.get() != nullptr) {
CollectMetrics(&s3Metric_->adaptorRead, ret, start);
}

VLOG(6) << "read end offset:" << offset << ", len:" << length
<< ", fsId:" << fsId_ << ", inodeId:" << inodeId;
return ret;
}

Expand Down Expand Up @@ -300,8 +308,8 @@ int S3ClientAdaptorImpl::Stop() {
}
diskCacheManagerImpl_->UmountDiskCache();
}
taskPool_.Stop();
client_->Deinit();
LOG(INFO) << "Stopping S3ClientAdaptor success";
return 0;
}

Expand Down Expand Up @@ -369,5 +377,25 @@ int S3ClientAdaptorImpl::ClearDiskCache(int64_t inodeId) {
return ret;
}

void S3ClientAdaptorImpl::Enqueue(
std::shared_ptr<FlushChunkCacheContext> context) {
auto task = [this, context]() {
this->FlushChunkClosure(context);
};
taskPool_.Enqueue(task);
}

int S3ClientAdaptorImpl::FlushChunkClosure(
std::shared_ptr<FlushChunkCacheContext> context) {
VLOG(9) << "FlushChunkCacheClosure start: " << context->inode;
FlushChunkCacheClosure done(context);
brpc::ClosureGuard done_guard(&done);
CURVEFS_ERROR ret = context->ptr->Flush(context->inode, context->force);
// 设置返回值
done.ctx->retCode = ret;
VLOG(9) << "FlushChunkCacheClosure end: " << context->inode;
return 0;
}

} // namespace client
} // namespace curvefs
35 changes: 35 additions & 0 deletions curvefs/src/client/s3/client_s3_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ namespace curvefs {
namespace client {

using ::curve::common::Thread;
using ::curve::common::TaskThreadPool;
using curvefs::client::common::S3ClientAdaptorOption;
using curvefs::client::common::DiskCacheType;
using curvefs::metaserver::Inode;
using curvefs::metaserver::S3ChunkInfo;
Expand All @@ -52,6 +54,8 @@ using rpcclient::MdsClient;
using curvefs::client::metric::S3Metric;

class DiskCacheManagerImpl;
class FlushChunkCacheContext;
class ChunkCacheManager;

class S3ClientAdaptor {
public:
Expand Down Expand Up @@ -89,6 +93,17 @@ class S3ClientAdaptor {
uint64_t start) = 0;
};

using FlushChunkCacheCallBack = std::function<
void(const std::shared_ptr<FlushChunkCacheContext>&)>;

struct FlushChunkCacheContext {
uint64_t inode;
ChunkCacheManagerPtr ptr;
bool force;
FlushChunkCacheCallBack cb;
CURVEFS_ERROR retCode;
};

// client use s3 internal interface
class S3ClientAdaptorImpl : public S3ClientAdaptor {
public:
Expand Down Expand Up @@ -205,6 +220,8 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
}
std::shared_ptr<S3Metric> s3Metric_;

void Enqueue(std::shared_ptr<FlushChunkCacheContext> context);

private:
std::shared_ptr<S3Client> client_;
uint64_t blockSize_;
Expand All @@ -214,6 +231,7 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
uint32_t prefetchExecQueueNum_;
std::string allocateServerEps_;
uint32_t flushIntervalSec_;
uint32_t chunkFlushThreads_;
uint32_t memCacheNearfullRatio_;
uint32_t throttleBaseSleepUs_;
Thread bgFlushThread_;
Expand All @@ -233,6 +251,23 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
std::vector<bthread::ExecutionQueueId<AsyncDownloadTask>>
downloadTaskQueues_;
uint32_t pageSize_;

// closure for write disk cache, simply wait
class FlushChunkCacheClosure : public google::protobuf::Closure {
public:
explicit FlushChunkCacheClosure(std::shared_ptr<
FlushChunkCacheContext> context) : ctx(context) {}
std::shared_ptr<FlushChunkCacheContext> ctx;
virtual ~FlushChunkCacheClosure() {}
void Run() override {
// 调用回调
ctx->cb(ctx);
}
};
int FlushChunkClosure(std::shared_ptr<FlushChunkCacheContext> context);

TaskThreadPool<bthread::Mutex, bthread::ConditionVariable>
taskPool_;
};

} // namespace client
Expand Down
Loading

0 comments on commit 7f41136

Please sign in to comment.