Skip to content

Commit

Permalink
curvefs/client: perf optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhongsong committed Apr 7, 2022
1 parent 89269ab commit 10c4809
Show file tree
Hide file tree
Showing 28 changed files with 572 additions and 203 deletions.
59 changes: 39 additions & 20 deletions curvefs/conf/client.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
##### mdsOpt
#### mdsOpt
# RPC total retry time with MDS
mdsOpt.mdsMaxRetryMS=16000
# The maximum timeout of RPC communicating with MDS.
Expand All @@ -15,7 +15,8 @@ mdsOpt.rpcRetryOpt.maxFailedTimesBeforeChangeAddr=2
mdsOpt.rpcRetryOpt.normalRetryTimesBeforeTriggerWait=3
# Sleep interval for wait
mdsOpt.rpcRetryOpt.waitSleepMs=1000
mdsOpt.rpcRetryOpt.addrs=127.0.0.1:6700,127.0.0.1:6701,127.0.0.1:6702 # __ANSIBLE_TEMPLATE__ {{ groups.mds | join_peer(hostvars, "mds_listen_port") }} __ANSIBLE_TEMPLATE__
#mdsOpt.rpcRetryOpt.addrs=127.0.0.1:6700,127.0.0.1:6701,127.0.0.1:6702 # __ANSIBLE_TEMPLATE__ {{ groups.mds | join_peer(hostvars, "mds_listen_port") }} __ANSIBLE_TEMPLATE__
mdsOpt.rpcRetryOpt.addrs=10.182.2.89:6070,10.182.2.89:6071,10.182.2.89:6072

#### metaCacheOpt
# Gets the number of retries for the leader
Expand Down Expand Up @@ -72,14 +73,18 @@ fuseClient.iCacheLruSize=65536
fuseClient.dCacheLruSize=65536
fuseClient.enableICacheMetrics=true
fuseClient.enableDCacheMetrics=true
fuseClient.cto=true
fuseClient.cto=false

#### volume
volume.bigFileSize=1048576
volume.volBlockSize=4096
volume.fsBlockSize=4096

#### s3
#s3.blocksize=1048576
#s3.chunksize=4194304
s3.blocksize=4194304
s3.chunksize=67108864
# the max size that fuse send
s3.fuseMaxSize=131072
s3.pagesize=65536
Expand All @@ -89,16 +94,28 @@ s3.prefetchBlocks=1
s3.prefetchExecQueueNum=1
# start sleep when mem cache use ratio is greater than nearfullRatio,
# sleep time increase follow with mem cache use raito, baseSleepUs is baseline.
s3.nearfullRatio=70
s3.nearfullRatio=50
s3.baseSleepUs=500

# TODO(huyao): use more meaningfull name
# background thread schedule time
s3.intervalSec=3
s3.chunkFlushThreads=5
# data cache flush wait time
s3.flushIntervalSec=5
s3.writeCacheMaxByte=838860800
s3.readCacheMaxByte=209715200

s3.ak=curve
s3.sk=curveadmin
s3.endpoint=http://10.182.2.33:7010
s3.bucket_name=wuhongsong

#s3.endpoint=http://10.182.2.33:9000
#s3.bucket_name=wuhongsong
#s3.ak=minioadmin
#s3.sk=minioadmin

# http = 0, https = 1
s3.http_scheme=0
s3.verify_SSL=False
Expand Down Expand Up @@ -134,28 +151,30 @@ diskCache.asyncLoadPeriodMs=5
diskCache.fullRatio=90
diskCache.safeRatio=70
# the max size disk cache can use
diskCache.maxUsableSpaceBytes=107374182400
diskCache.maxUsableSpaceBytes=10737418240
diskCache.limitReadUesdRatio=50
# the max time system command can run
diskCache.cmdTimeoutSec=300
diskCache.threads = 5
# directory of disk cache
diskCache.cacheDir=/mnt/curvefs_cache # __CURVEADM_TEMPLATE__ /curvefs/client/data/cache __CURVEADM_TEMPLATE__ __ANSIBLE_TEMPLATE__ /mnt/curvefs_disk_cache/{{ 99999999 | random | to_uuid | upper }} __ANSIBLE_TEMPLATE__

# the write throttle bps of disk cache, default 80MB/s
diskCache.avgFlushBytes=83886080
# the write burst bps of disk cache, default 100MB/s
diskCache.burstFlushBytes=104857600
# the times that write burst bps can continue, default 180s
diskCache.cacheDir=/home/hzwuhongsong/disk_cache/cache # __CURVEADM_TEMPLATE__ /curvefs/client/data/cache __CURVEADM_TEMPLATE__ __ANSIBLE_TEMPLATE__ /mnt/curvefs_disk_cache/{{ 99999999 | random | to_uuid | upper }} __ANSIBLE_TEMPLATE__
# the write bps of disk cache
#diskCache.avgFlushBytes=83886080
#diskCache.burstFlushBytes=104857600
diskCache.burstSecs=180
# the write throttle iops of disk cache, default no limit
diskCache.avgFlushIops=0
# the read throttle bps of disk cache, default 80MB/s
diskCache.avgFlushBytes=0
diskCache.burstFlushBytes=0
# the write iops of disk cache
diskCache.avgFlushIops=100000000
# the read bps of disk cache
diskCache.avgReadFileBytes=83886080
# the read throttle iops of disk cache, default no limit
diskCache.avgReadFileIops=0
# the read iops of disk cache
diskCache.avgReadFileIops=100000000

#### common
client.common.logDir=/data/logs/curvefs # __CURVEADM_TEMPLATE__ /curvefs/client/logs __CURVEADM_TEMPLATE__
#client.common.logDir=/mnt/nvme0n1/log/ # __CURVEADM_TEMPLATE__ /curvefs/client/logs __CURVEADM_TEMPLATE__
client.common.logDir=/home/hzwuhongsong/disk_cache/log/ # __CURVEADM_TEMPLATE__ /curvefs/client/logs __CURVEADM_TEMPLATE__
# we have loglevel: {0,3,6,9}
# as the number increases, it becomes more and more detailed
client.loglevel=0
client.dummyserver.startport=9000
client.loglevel=3
client.dummyserver.startport=9005
2 changes: 2 additions & 0 deletions curvefs/src/client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ cc_binary(
],
deps = [
":fuse_client_lib",
"@com_google_absl//absl/memory",
],
)

Expand Down Expand Up @@ -61,5 +62,6 @@ cc_library(
"//src/client:curve_client",
"//src/common:curve_common",
"//src/common:curve_s3_adapter",
"@com_google_absl//absl/memory",
],
)
4 changes: 4 additions & 0 deletions curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ void InitDiskCacheOption(Configuration *conf,
&diskCacheOption->maxUsableSpaceBytes);
conf->GetValueFatalIfFail("diskCache.cmdTimeoutSec",
&diskCacheOption->cmdTimeoutSec);
conf->GetValueFatalIfFail("diskCache.threads",
&diskCacheOption->threads);
conf->GetValueFatalIfFail("diskCache.avgFlushBytes",
&diskCacheOption->avgFlushBytes);
conf->GetValueFatalIfFail("diskCache.burstFlushBytes",
Expand Down Expand Up @@ -149,6 +151,8 @@ void InitS3Option(Configuration *conf, S3Option *s3Opt) {
&s3Opt->s3ClientAdaptorOpt.intervalSec);
conf->GetValueFatalIfFail("s3.flushIntervalSec",
&s3Opt->s3ClientAdaptorOpt.flushIntervalSec);
conf->GetValueFatalIfFail("s3.chunkFlushThreads",
&s3Opt->s3ClientAdaptorOpt.chunkFlushThreads);
conf->GetValueFatalIfFail("s3.writeCacheMaxByte",
&s3Opt->s3ClientAdaptorOpt.writeCacheMaxByte);
conf->GetValueFatalIfFail("s3.readCacheMaxByte",
Expand Down
3 changes: 3 additions & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ struct DiskCacheOption {
uint64_t maxUsableSpaceBytes;
// the max time system command can run
uint32_t cmdTimeoutSec;
// threads for disk cache
uint32_t threads;
// the write throttle bps of disk cache
uint64_t avgFlushBytes;
// the write burst bps of disk cache
Expand All @@ -112,6 +114,7 @@ struct S3ClientAdaptorOption {
uint32_t prefetchBlocks;
uint32_t prefetchExecQueueNum;
uint32_t intervalSec;
uint32_t chunkFlushThreads;
uint32_t flushIntervalSec;
uint64_t writeCacheMaxByte;
uint64_t readCacheMaxByte;
Expand Down
40 changes: 35 additions & 5 deletions curvefs/src/client/s3/client_s3_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
* Author: huyao
*/



#include <brpc/channel.h>
#include <brpc/controller.h>
#include <algorithm>
#include <list>

#include "absl/memory/memory.h"
#include "curvefs/src/client/s3/client_s3_adaptor.h"
#include "curvefs/src/common/s3util.h"

Expand Down Expand Up @@ -57,11 +57,13 @@ S3ClientAdaptorImpl::Init(
memCacheNearfullRatio_ = option.nearfullRatio;
throttleBaseSleepUs_ = option.baseSleepUs;
flushIntervalSec_ = option.flushIntervalSec;
chunkFlushThreads_ = option.chunkFlushThreads;
client_ = client;
inodeManager_ = inodeManager;
mdsClient_ = mdsClient;
fsCacheManager_ = fsCacheManager;
diskCacheManagerImpl_ = diskCacheManagerImpl;
waitIntervalSec_.Init(option.intervalSec * 1000);
if (HasDiskCache()) {
diskCacheManagerImpl_ = diskCacheManagerImpl;
if (diskCacheManagerImpl_->Init(option) < 0) {
Expand Down Expand Up @@ -94,6 +96,9 @@ S3ClientAdaptorImpl::Init(
<< ", readCacheMaxByte: " << option.readCacheMaxByte
<< ", nearfullRatio: " << option.nearfullRatio
<< ", baseSleepUs: " << option.baseSleepUs;
// start chunk flush threads
taskPool_.Start(chunkFlushThreads_);
VLOG(0) << "whs: init sucess";
return CURVEFS_ERROR::OK;
}

Expand All @@ -115,12 +120,16 @@ int S3ClientAdaptorImpl::Write(uint64_t inodeId, uint64_t offset,
if ((size + pendingReq * fuseMaxSize_) >= maxSize) {
LOG(INFO) << "write cache is full, wait flush. size:" << size
<< ", maxSize:" << maxSize;
// offer to do flush
waitIntervalSec_.StopWait();
fsCacheManager_->WaitFlush();
}
}
uint64_t memCacheRatio = fsCacheManager_->MemCacheRatio();
int64_t exceedRatio = memCacheRatio - memCacheNearfullRatio_;
if (exceedRatio > 0) {
// offer to do flush
waitIntervalSec_.StopWait();
// upload to s3 derectly or cache disk full
bool needSleep =
(DisableDiskCache() || IsReadCache()) ||
Expand All @@ -145,7 +154,7 @@ int S3ClientAdaptorImpl::Write(uint64_t inodeId, uint64_t offset,

int S3ClientAdaptorImpl::Read(uint64_t inodeId, uint64_t offset,
uint64_t length, char *buf) {
VLOG(6) << "read start offset:" << offset << ", len:" << length
VLOG(6) << "whs read start offset:" << offset << ", len:" << length
<< ", fsId:" << fsId_ << ", inodeId:" << inodeId;
uint64_t start = butil::cpuwide_time_us();
FileCacheManagerPtr fileCacheManager =
Expand All @@ -159,7 +168,8 @@ int S3ClientAdaptorImpl::Read(uint64_t inodeId, uint64_t offset,
if (s3Metric_.get() != nullptr) {
CollectMetrics(&s3Metric_->adaptorRead, ret, start);
}

VLOG(6) << "whs read end offset:" << offset << ", len:" << length
<< ", fsId:" << fsId_ << ", inodeId:" << inodeId;
return ret;
}

Expand Down Expand Up @@ -299,8 +309,8 @@ int S3ClientAdaptorImpl::Stop() {
}
diskCacheManagerImpl_->UmountDiskCache();
}
taskPool_.Stop();
client_->Deinit();
LOG(INFO) << "Stopping S3ClientAdaptor success";
return 0;
}

Expand Down Expand Up @@ -368,5 +378,25 @@ int S3ClientAdaptorImpl::ClearDiskCache(int64_t inodeId) {
return ret;
}

void S3ClientAdaptorImpl::Enqueue(
std::shared_ptr<FlushChunkCacheContext> context) {
auto task = [this, context]() {
this->FlushChunkClosure(context);
};
taskPool_.Enqueue(task);
}

int S3ClientAdaptorImpl::FlushChunkClosure(
std::shared_ptr<FlushChunkCacheContext> context) {
VLOG(9) << "FlushChunkCacheClosure start: " << context->inode;
FlushChunkCacheClosure done(context);
brpc::ClosureGuard done_guard(&done);
CURVEFS_ERROR ret = context->ptr->Flush(context->inode, context->force);
// 设置返回值
done.ctx->retCode = ret;
VLOG(9) << "FlushChunkCacheClosure end: " << context->inode;
return 0;
}

} // namespace client
} // namespace curvefs
37 changes: 37 additions & 0 deletions curvefs/src/client/s3/client_s3_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ namespace curvefs {
namespace client {

using ::curve::common::Thread;
using ::curve::common::TaskThreadPool;
using curvefs::client::common::S3ClientAdaptorOption;
using curvefs::client::common::DiskCacheType;
using curvefs::metaserver::Inode;
using curvefs::metaserver::S3ChunkInfo;
Expand All @@ -55,6 +57,8 @@ using rpcclient::MdsClient;
using curvefs::client::metric::S3Metric;

class DiskCacheManagerImpl;
class FlushChunkCacheContext;
class ChunkCacheManager;

class S3ClientAdaptor {
public:
Expand Down Expand Up @@ -92,6 +96,16 @@ class S3ClientAdaptor {
uint64_t start) = 0;
};

using FlushChunkCacheCallBack = std::function<void(const std::shared_ptr<FlushChunkCacheContext>&)>;

struct FlushChunkCacheContext {
uint64_t inode;
ChunkCacheManagerPtr ptr;
bool force;
FlushChunkCacheCallBack cb;
CURVEFS_ERROR retCode;
};

// client use s3 internal interface
class S3ClientAdaptorImpl : public S3ClientAdaptor {
public:
Expand Down Expand Up @@ -208,6 +222,8 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
}
std::shared_ptr<S3Metric> s3Metric_;

void Enqueue(std::shared_ptr<FlushChunkCacheContext> context);

private:
std::shared_ptr<S3Client> client_;
uint64_t blockSize_;
Expand All @@ -217,6 +233,7 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
uint32_t prefetchExecQueueNum_;
std::string allocateServerEps_;
uint32_t flushIntervalSec_;
uint32_t chunkFlushThreads_;
uint32_t memCacheNearfullRatio_;
uint32_t throttleBaseSleepUs_;
Thread bgFlushThread_;
Expand All @@ -236,6 +253,26 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
std::vector<bthread::ExecutionQueueId<AsyncDownloadTask>>
downloadTaskQueues_;
uint32_t pageSize_;

// closure for write disk cache, simply wait
class FlushChunkCacheClosure : public google::protobuf::Closure {
public:
FlushChunkCacheClosure(std::shared_ptr<FlushChunkCacheContext> context) : ctx(context) {
}
std::shared_ptr<FlushChunkCacheContext> ctx;
virtual ~FlushChunkCacheClosure() {}
void Run() override {
// std::lock_guard<std::mutex> l(mutex_);
// runned_ = true;
// cond_.notify_one();
// 调用回调
ctx->cb(ctx);
}
};
int FlushChunkClosure(std::shared_ptr<FlushChunkCacheContext> context);

TaskThreadPool<bthread::Mutex, bthread::ConditionVariable>
taskPool_;
};

} // namespace client
Expand Down
Loading

0 comments on commit 10c4809

Please sign in to comment.