Skip to content

Commit

Permalink
(selectdb-cloud) Reduce network RTT for files that multipart are not …
Browse files Browse the repository at this point in the history
…applicable (apache#1772)
  • Loading branch information
ByteYue authored Jun 11, 2023
1 parent 5bf2e91 commit 205efc2
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 17 deletions.
7 changes: 7 additions & 0 deletions be/src/cloud/io/s3_file_bufferpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ struct UploadFileBuffer final : public FileBuffer {
*/
std::shared_ptr<std::iostream> get_stream() const { return _stream_ptr; }

/**
* Currently only used for small file to set callback
*/
void set_upload_to_remote(std::function<void(UploadFileBuffer&)> cb) {
_upload_to_remote = std::move(cb);
}

private:
std::function<void(UploadFileBuffer&)> _upload_to_remote = nullptr;
std::shared_ptr<std::iostream> _stream_ptr; // point to _buffer.get_data()
Expand Down
70 changes: 57 additions & 13 deletions be/src/cloud/io/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/CompletedPart.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/UploadPartRequest.h>

#include <atomic>
Expand All @@ -34,6 +35,7 @@
#include "cloud/io/cloud_file_cache.h"
#include "cloud/io/cloud_file_cache_factory.h"
#include "cloud/io/cloud_file_segment.h"
#include "cloud/io/s3_file_bufferpool.h"
#include "cloud/io/s3_file_system.h"
#include "common/logging.h"
#include "common/status.h"
Expand Down Expand Up @@ -83,14 +85,22 @@ void S3FileWriter::_wait_until_finish(std::string task_name) {
}

Status S3FileWriter::open() {
DCHECK(!_opened) << "closed " << _closed << " opened " << _opened;
VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native();
if (config::enable_file_cache && !_disable_file_cache) {
_cache_key = CloudFileCache::hash(_path.filename().native());
_cache = FileCacheFactory::instance().get_by_path(_cache_key);
}
_opened = true;
_closed = false;

return Status::OK();
}

Status S3FileWriter::_open() {
CreateMultipartUploadRequest create_request;
create_request.WithBucket(_bucket).WithKey(_key);
create_request.SetContentType("text/plain");
create_request.SetContentType("application/octet-stream");
if (_sse_enabled) {
create_request.WithServerSideEncryption(Aws::S3::Model::ServerSideEncryption::AES256);
}
Expand All @@ -99,8 +109,6 @@ Status S3FileWriter::open() {

if (outcome.IsSuccess()) {
_upload_id = outcome.GetResult().GetUploadId();
_closed = false;
_opened = true;
return Status::OK();
}
return Status::IOError("failed to create multipart upload(bucket={}, key={}, upload_id={}): {}",
Expand All @@ -109,15 +117,20 @@ Status S3FileWriter::open() {

Status S3FileWriter::abort() {
_failed = true;
_closed = true;
if (_closed || !_opened) {
return Status::OK();
}
if (_pending_buf != nullptr) {
// we need to reclaim the memory
if (_pending_buf) {
_pending_buf->on_finish();
_pending_buf = nullptr;
}
// upload id is empty means there was no create multi upload
if (_upload_id.empty()) {
return Status::OK();
}
VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native();
_closed = true;
_wait_until_finish("early quit");
AbortMultipartUploadRequest request;
request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
Expand All @@ -135,11 +148,11 @@ Status S3FileWriter::abort() {
}

Status S3FileWriter::close(bool /*sync*/) {
if (_closed) {
_closed = true;
if (_closed || !_opened) {
return Status::OK();
}
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
_closed = true;
if (_pending_buf != nullptr) {
_wait.add();
_pending_buf->submit();
Expand All @@ -151,12 +164,12 @@ Status S3FileWriter::close(bool /*sync*/) {
}

Status S3FileWriter::append(const Slice& data) {
DCHECK(!_closed);
DCHECK(!_closed && _opened) << "closed " << _closed << " opened " << _opened;
return appendv(&data, 1);
}

Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
DCHECK(!_closed);
DCHECK(!_closed && _opened) << "closed " << _closed << " opened " << _opened;
size_t buffer_size = config::s3_write_buffer_size;
for (size_t i = 0; i < data_cnt; i++) {
size_t data_size = data[i].get_size();
Expand Down Expand Up @@ -202,6 +215,9 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
// satisfy that the size is larger than or euqal to 5MB
// _complete() would handle the first situation
if (_pending_buf->get_size() == buffer_size) {
if (1 == _cur_part_num) [[unlikely]] {
RETURN_IF_ERROR(_open());
}
_cur_part_num++;
_wait.add();
_pending_buf->submit();
Expand Down Expand Up @@ -245,14 +261,15 @@ void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) {

s3_bytes_written << buf.get_size();

std::shared_ptr<CompletedPart> completed_part = std::make_shared<CompletedPart>();
std::unique_ptr<CompletedPart> completed_part = std::make_unique<CompletedPart>();

completed_part->SetPartNumber(part_num);
auto etag = upload_part_outcome.GetResult().GetETag();
// DCHECK(etag.empty());
completed_part->SetETag(etag);

std::unique_lock<std::mutex> lck {_completed_lock};
_completed_parts.emplace_back(completed_part);
_completed_parts.emplace_back(std::move(completed_part));
_bytes_written += buf.get_size();
}

Expand All @@ -265,11 +282,32 @@ FileSegmentsHolderPtr S3FileWriter::_allocate_file_segments(size_t offset) {
return std::make_unique<FileSegmentsHolder>(std::move(holder));
}

void S3FileWriter::_put_object(UploadFileBuffer& buf) {
DCHECK(!_closed && _opened) << "closed " << _closed << " opened " << _opened;
Aws::S3::Model::PutObjectRequest request;
request.WithBucket(_bucket).WithKey(_key);
request.SetBody(buf.get_stream());
request.SetContentLength(buf.get_size());
auto response = _client->PutObject(request);
if (!response.IsSuccess()) {
_st = Status::InternalError("Error: [{}:{}, responseCode:{}]",
response.GetError().GetExceptionName(),
response.GetError().GetMessage(),
static_cast<int>(response.GetError().GetResponseCode()));
buf.set_val(_st);
}
}

Status S3FileWriter::_complete() {
if (_failed) {
_wait_until_finish("early quit");
return _st;
}
// upload id is empty means there was no multipart upload
if (_upload_id.empty()) {
_wait.wait();
return _st;
}
CompleteMultipartUploadRequest complete_request;
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);

Expand All @@ -278,7 +316,7 @@ Status S3FileWriter::_complete() {
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); });
CompletedMultipartUpload completed_upload;
for (std::shared_ptr<CompletedPart> part : _completed_parts) {
for (auto& part : _completed_parts) {
completed_upload.AddParts(*part);
}

Expand All @@ -302,10 +340,16 @@ Status S3FileWriter::write_at(size_t /*offset*/, const Slice& /*data*/) {
}

Status S3FileWriter::finalize() {
DCHECK(!_closed);
DCHECK(!_closed && _opened) << "closed " << _closed << " opened " << _opened;
// submit pending buf if it's not nullptr
// it's the last buf, we can submit it right now
if (_pending_buf != nullptr) {
// the whole file size is less than one buffer, we can just call PutObject to reduce network RTT
if (1 == _cur_part_num) {
auto buf = std::static_pointer_cast<UploadFileBuffer>(_pending_buf);
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
}
_wait.add();
_pending_buf->submit();
_pending_buf = nullptr;
Expand Down
7 changes: 3 additions & 4 deletions be/src/cloud/io/s3_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ class S3FileWriter final : public FileWriter {

private:
Status _complete();
// void _upload_to_cache(const Slice& data, S3FileBuffer& buf);
Status _open();
void _upload_one_part(int64_t part_num, UploadFileBuffer& buf);
void _put_object(UploadFileBuffer& buf);

FileSegmentsHolderPtr _allocate_file_segments(size_t offset);

Expand All @@ -101,13 +102,11 @@ class S3FileWriter final : public FileWriter {
std::string _upload_id;
size_t _bytes_appended {0};
size_t _index_offset {0};
// size_t _index_offset {0};
// bool _set_index_idx {false};

// Current Part Num for CompletedPart
int _cur_part_num = 1;
std::mutex _completed_lock;
std::vector<std::shared_ptr<Aws::S3::Model::CompletedPart>> _completed_parts;
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> _completed_parts;

Key _cache_key;
CloudFileCachePtr _cache;
Expand Down

0 comments on commit 205efc2

Please sign in to comment.