Skip to content

Commit

Permalink
add delete impl
Browse files Browse the repository at this point in the history
Signed-off-by: 申屠鹏会 <shentupenghui@gmail.com>

Adjusted the output style

Signed-off-by: 申屠鹏会 <shentupenghui@gmail.com>

add CURVE prefix

Signed-off-by: 申屠鹏会 <shentupenghui@gmail.com>

feat: [tools-v2] bs list dir

Signed-off-by: Sindweller <sindweller5530@gmail.com>

curvefs/client: local cache policy optimization

Signed-off-by: tangruilin <tang.ruilin@foxmail.com>

remove username, use user

Signed-off-by: 申屠鹏会 <shentupenghui@gmail.com>

curvefs/client: local cache policy optimization

Signed-off-by: tangruilin <tang.ruilin@foxmail.com>
  • Loading branch information
shentupenghui committed Nov 23, 2022
1 parent bf406a0 commit b39facd
Show file tree
Hide file tree
Showing 15 changed files with 534 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,11 @@ __not_found__
thirdparties/rocksdb/lib/
thirdparties/rocksdb/include/
thirdparties/rocksdb/rocksdb/
thirdparties/rocksdb/*log
thirdparties/rocksdb/*.tar.gz
thirdparties/aws/*.tar.gz
thirdparties/etcdclient/tmp/
thirdparties/etcdclient/*.h

/external
/bazel-*
Expand Down
37 changes: 29 additions & 8 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,11 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
curve::common::CountDownEvent cond(1);
std::atomic<uint64_t> pendingReq(0);
FSStatusCode ret;
enum class cachePoily {
NCache,
RCache,
WRCache,
} cachePoily = cachePoily::NCache;

VLOG(9) << "DataCache::Flush : now:" << now << ",createTime:" << createTime_
<< ",flushIntervalSec:" << flushIntervalSec
Expand Down Expand Up @@ -2225,8 +2230,22 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
<< ", inodeId:" << inodeId
<< ",Len:" << tmpLen << ",blockPos:" << blockPos
<< ",blockIndex:" << blockIndex;

const bool mayCache = s3ClientAdaptor_->HasDiskCache() &&
!s3ClientAdaptor_->GetDiskCacheManager()
->IsDiskCacheFull() && !toS3;

if (s3ClientAdaptor_->IsReadCache() && mayCache) {
cachePoily = cachePoily::RCache;
} else if (s3ClientAdaptor_->IsReadWriteCache() && mayCache) {
cachePoily = cachePoily::WRCache;
} else {
cachePoily = cachePoily::NCache;
}

PutObjectAsyncCallBack cb =
[&](const std::shared_ptr<PutObjectAsyncContext> &context) {
[&, cachePoily]
(const std::shared_ptr<PutObjectAsyncContext> &context) {
if (context->retCode == 0) {
if (s3ClientAdaptor_->s3Metric_.get() != nullptr) {
s3ClientAdaptor_->CollectMetrics(
Expand All @@ -2243,17 +2262,19 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
}
VLOG(9) << "PutObjectAsyncCallBack: " << context->key
<< " pendingReq is: " << pendingReq;
if (cachePoily::RCache == cachePoily) {
VLOG(9) << "Write to read cache, name: " << context->key;
s3ClientAdaptor_->GetDiskCacheManager()
->Enqueue(context, true);
}
return;
}
LOG(WARNING) << "Put object failed, key: " << context->key;
s3ClientAdaptor_->GetS3Client()->UploadAsync(context);
};

std::vector<std::shared_ptr<PutObjectAsyncContext>> uploadTasks;
bool useDiskCache =
s3ClientAdaptor_->IsReadWriteCache() &&
!s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() &&
!toS3;

while (tmpLen > 0) {
if (blockPos + tmpLen > blockSize) {
n = blockSize - blockPos;
Expand Down Expand Up @@ -2288,10 +2309,10 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
++iter) {
VLOG(9) << "upload start: " << (*iter)->key
<< " len : " << (*iter)->bufferSize;
if (!useDiskCache) {
s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter);
} else {
if (cachePoily::WRCache == cachePoily) {
s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(*iter);
} else {
s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter);
}
}
cond.Wait();
Expand Down
21 changes: 20 additions & 1 deletion curvefs/src/client/s3/disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,32 @@ int DiskCacheManagerImpl::Init(const S3ClientAdaptorOption option) {
}

void DiskCacheManagerImpl::Enqueue(
std::shared_ptr<PutObjectAsyncContext> context) {
std::shared_ptr<PutObjectAsyncContext> context, bool isReadCacheOnly) {
if ( isReadCacheOnly ) {
auto task = [this, context]() {
this->WriteReadDirectClosure(context);
};
taskPool_.Enqueue(task);
return;
}
auto task = [this, context]() {
this->WriteClosure(context);
};
taskPool_.Enqueue(task);
}



int DiskCacheManagerImpl::WriteReadDirectClosure(
std::shared_ptr<PutObjectAsyncContext> context) {
VLOG(9) << "WriteReadClosure start, name: " << context->key;
// Write to read cache, we don't care if the cache wirte success
int ret = WriteReadDirect(context->key,
context->buffer, context->bufferSize);
VLOG(9) << "WriteReadClosure end, name: " << context->key;
return ret;
}

int DiskCacheManagerImpl::WriteClosure(
std::shared_ptr<PutObjectAsyncContext> context) {
VLOG(9) << "WriteClosure start, name: " << context->key;
Expand Down
5 changes: 4 additions & 1 deletion curvefs/src/client/s3/disk_cache_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class DiskCacheManagerImpl {

virtual int ClearReadCache(const std::list<std::string> &files);

void Enqueue(std::shared_ptr<PutObjectAsyncContext> context);
void Enqueue(std::shared_ptr<PutObjectAsyncContext> context,
bool isReadCacheOnly = false);

private:
int WriteDiskFile(const std::string name, const char *buf, uint64_t length);
Expand All @@ -128,6 +129,8 @@ class DiskCacheManagerImpl {
std::shared_ptr<S3Client> client_;

int WriteClosure(std::shared_ptr<PutObjectAsyncContext> context);

int WriteReadDirectClosure(std::shared_ptr<PutObjectAsyncContext> context);
// threads for disk cache
uint32_t threads_;
TaskThreadPool<bthread::Mutex, bthread::ConditionVariable>
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/s3/disk_cache_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
#include "curvefs/src/common/wrap_posix.h"
#include "curvefs/src/common/utils.h"
#include "curvefs/src/client/s3/client_s3.h"
#include "curvefs/src/client/s3/disk_cache_write.h"
#include "curvefs/src/client/s3/disk_cache_read.h"
#include "curvefs/src/client/common/config.h"
#include "curvefs/src/client/s3/disk_cache_base.h"
Expand Down Expand Up @@ -135,6 +134,7 @@ class DiskCacheWrite : public DiskCacheBase {
}

private:
using DiskCacheBase::Init;
int AsyncUploadFunc();
void UploadFile(const std::list<std::string> &toUpload,
std::shared_ptr<SynchronizationTask> syncTask = nullptr);
Expand Down
30 changes: 30 additions & 0 deletions curvefs/test/client/test_disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>

#include "curvefs/src/client/common/common.h"
#include "curvefs/test/client/mock_disk_cache_write.h"
#include "curvefs/test/client/mock_disk_cache_read.h"
#include "curvefs/test/client/mock_disk_cache_manager.h"
Expand Down Expand Up @@ -138,6 +139,35 @@ TEST_F(TestDiskCacheManagerImpl, WriteClosure) {
sleep(5);
}

TEST_F(TestDiskCacheManagerImpl, WriteReadClosure) {
PutObjectAsyncCallBack cb =
[&](const std::shared_ptr<PutObjectAsyncContext> &context) {
};
auto context = std::make_shared<PutObjectAsyncContext>();
context->key = "objectName";
char data[5] = "gggg";
context->buffer = data + 0;
context->bufferSize = 2;
context->cb = cb;
context->startTime = butil::cpuwide_time_us();

S3ClientAdaptorOption s3AdaptorOption;
s3AdaptorOption.diskCacheOpt.threads = 5;
s3AdaptorOption.diskCacheOpt.diskCacheType = DiskCacheType::OnlyRead;
EXPECT_CALL(*diskCacheManager_, Init(_, _)).WillOnce(Return(0));
diskCacheManagerImpl_->Init(s3AdaptorOption);
std::string fileName = "test";
std::string buf = "test";

// If the mode is read cache, will call WriteReadDirect
EXPECT_CALL(*diskCacheManager_, IsDiskUsedInited()).WillOnce(Return(true));
EXPECT_CALL(*diskCacheManager_, IsDiskCacheFull()).WillOnce(Return(false));
EXPECT_CALL(*diskCacheManager_, WriteReadDirect(_, _, _))
.WillOnce(Return(context->bufferSize));
diskCacheManagerImpl_->Enqueue(context, true);
sleep(5);
}

TEST_F(TestDiskCacheManagerImpl, Write) {
std::string fileName = "test";
std::string buf = "test";
Expand Down
19 changes: 19 additions & 0 deletions tools-v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ A tool for CurveFS & CurveBs.
- [list logical-pool](#list-logical-pool)
- [list server](#list-server)
- [list client](#list-client)
- [list dir](#list-dir)
- [query](#query-1)
- [query file](#query-file)
- [status](#status-1)
Expand Down Expand Up @@ -861,6 +862,24 @@ Output:
+------------+------+
```

##### list dir

list dir information in curvebs

```bash
curve bs list dir --dir /
```

Output:

```bash
+------+-------------+----------+-----------------+------------+---------------------+---------------+-------------+
| ID | FILENAME | PARENTID | FILETYPE | OWNER | CTIME | ALLOCATEDSIZE | FILESIZE |
+------+-------------+----------+-----------------+------------+---------------------+---------------+-------------+
| 1 | /RecycleBin | 0 | INODE_DIRECTORY | root | 2022-11-12 16:38:25 | 0 B | 0 B |
+------+-------------+----------+-----------------+------------+---------------------+---------------+-------------+
```

### query

##### query file
Expand Down
2 changes: 2 additions & 0 deletions tools-v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/opencurve/curve/tools-v2

go 1.19

replace github.com/optiopay/kafka => github.com/cilium/kafka v0.0.0-20180809090225-01ce283b732b

require (
github.com/cilium/cilium v1.12.1
github.com/deckarep/golang-set/v2 v2.1.0
Expand Down
6 changes: 5 additions & 1 deletion tools-v2/internal/error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ var (
return NewInternalCmdError(39, "list zone fail. the error is %s")
}

ErrBsDeleteFile = func() *CmdError {
return NewInternalCmdError(40, "delete file fail. the error is %s")
}

// http error
ErrHttpUnreadableResult = func() *CmdError {
return NewHttpResultCmdError(1, "http response is unreadable, the uri is: %s, the error is: %s")
Expand Down Expand Up @@ -433,7 +437,7 @@ var (
case mds.FSStatusCode_S3_INFO_ERROR:
message = "s3 info is not available"
case mds.FSStatusCode_FSNAME_INVALID:
message = "fsname should match regex: ^([a-z0-9]+\\-?)+$"
message = "fsname should match regex: ^([a-z0-9]+\\-?)+$"
default:
message = fmt.Sprintf("delete fs failed!, error is %s", mds.FSStatusCode_name[int32(code)])
}
Expand Down
5 changes: 5 additions & 0 deletions tools-v2/internal/utils/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package cobrautil
const (
ROW_ADDR = "addr"
ROW_ALLOC = "alloc"
ROW_ALLOC_SIZE = "allocatedSize"
ROW_BLOCKSIZE = "blocksize"
ROW_CAPACITY = "capacity"
ROW_CHILD_LIST = "childList"
Expand All @@ -40,6 +41,8 @@ const (
ROW_EXPLAIN = "explain"
ROW_EXTERNAL_ADDR = "externalAddr"
ROW_FILE_SIZE = "fileSize"
ROW_FILE_TYPE = "fileType"
ROW_FILE_NAME = "fileName"
ROW_FS_ID = "fsId"
ROW_FS_NAME = "fsName"
ROW_FS_TYPE = "fsType"
Expand All @@ -64,6 +67,7 @@ const (
ROW_ORIGINAL_PATH = "originalPath"
ROW_OWNER = "owner"
ROW_PARENT = "parent"
ROW_PARENT_ID = "parentId"
ROW_PARTITION_ID = "partitionId"
ROW_PEER_ADDR = "peerAddr"
ROW_PEER_ID = "peerId"
Expand Down Expand Up @@ -93,6 +97,7 @@ const (
ROW_ZONE = "zone"
ROW_IP = "ip"
ROW_PORT = "port"
ROW_REASON = "reason"

// s3
ROW_S3CHUNKINFO_CHUNKID = "s3ChunkId"
Expand Down
2 changes: 2 additions & 0 deletions tools-v2/pkg/cli/command/curvebs/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package curvebs

import (
basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command"
"github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/delete"
"github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list"
"github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/query"
"github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/status"
Expand All @@ -41,6 +42,7 @@ func (bsCmd *CurveBsCommand) AddSubCommands() {
list.NewListCommand(),
query.NewQueryCommand(),
status.NewStatusCommand(),
delete.NewDeleteCommand(),
)
}

Expand Down
Loading

0 comments on commit b39facd

Please sign in to comment.