Skip to content

Commit

Permalink
chore: support load/save from GCS
Browse files Browse the repository at this point in the history
Not everything is supported but manual load save is supported.

1. Run dragonfly with `--dir gs://bucket/path`
2. In redis-cli:
   a. SET foo bar
   b. SAVE DF gsdump
   c. DFLY LOAD gs://bucket/path/gsdump-summary.dfs

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange committed Oct 28, 2024
1 parent b9c1aae commit a03272e
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 16 deletions.
3 changes: 2 additions & 1 deletion src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,14 @@ void SinkReplyBuilder2::Send() {
uint64_t before_ns = util::fb2::ProactorBase::GetMonotonicTimeNs();
reply_stats.io_write_cnt++;
reply_stats.io_write_bytes += total_size_;

DVLOG(2) << "Writing " << total_size_ << " bytes";
if (auto ec = sink_->Write(vecs_.data(), vecs_.size()); ec)
ec_ = ec;

uint64_t after_ns = util::fb2::ProactorBase::GetMonotonicTimeNs();
reply_stats.send_stats.count++;
reply_stats.send_stats.total_duration += (after_ns - before_ns) / 1'000;
DVLOG(2) << "Finished writing " << total_size_ << " bytes";
send_active_ = false;
}

Expand Down
10 changes: 5 additions & 5 deletions src/server/detail/save_stages_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace fs = std::filesystem;
namespace {

bool IsCloudPath(string_view path) {
return absl::StartsWith(path, kS3Prefix);
return absl::StartsWith(path, kS3Prefix) || absl::StartsWith(path, kGCSPrefix);
}

// Create a directory and all its parents if they don't exist.
Expand Down Expand Up @@ -240,7 +240,7 @@ RdbSaver::SnapshotStats SaveStagesController::GetCurrentSnapshotProgress() const
// Summary file is always last in snapshots array.
void SaveStagesController::SaveDfs() {
// Extend all filenames with -{sid} or -summary and append .dfs.tmp
const string_view ext = is_cloud_ ? ".dfs" : ".dfs.tmp";
const string_view ext = snapshot_storage_->IsCloud() ? ".dfs" : ".dfs.tmp";
ShardId sid = 0;
for (auto& [_, filename] : snapshots_) {
filename = full_path_;
Expand Down Expand Up @@ -286,7 +286,7 @@ void SaveStagesController::SaveRdb() {
filename = full_path_;
if (!filename.has_extension())
filename += ".rdb";
if (!is_cloud_)
if (!snapshot_storage_->IsCloud())
filename += ".tmp";

if (auto err = snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_)); err) {
Expand Down Expand Up @@ -342,7 +342,7 @@ void SaveStagesController::InitResources() {

// Remove .tmp extension or delete files in case of error
GenericError SaveStagesController::FinalizeFileMovement() {
if (is_cloud_)
if (snapshot_storage_->IsCloud())
return {};
DVLOG(1) << "FinalizeFileMovement start";

Expand Down Expand Up @@ -391,7 +391,7 @@ GenericError SaveStagesController::BuildFullPath() {
dest_buf.resize(len);

full_path_ = dir_path / dest_buf;
is_cloud_ = IsCloudPath(full_path_.string());

return {};
}

Expand Down
1 change: 0 additions & 1 deletion src/server/detail/save_stages_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ struct SaveStagesController : public SaveStagesInputs {
private:
time_t start_time_;
std::filesystem::path full_path_;
bool is_cloud_;

AggregateGenericError shared_err_;
std::vector<std::pair<std::unique_ptr<RdbSnapshot>, std::filesystem::path>> snapshots_;
Expand Down
7 changes: 2 additions & 5 deletions src/server/detail/snapshot_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ constexpr string_view kSummarySuffix = "summary.dfs"sv;

pair<string, string> GetBucketPath(string_view path) {
string_view clean = path;
if (absl::StartsWith(clean, kS3Prefix)) {
clean = absl::StripPrefix(clean, kS3Prefix);
} else {
clean = absl::StripPrefix(clean, kGCSPrefix);
}
auto prefix = absl::StartsWith(clean, kS3Prefix) ? kS3Prefix : kGCSPrefix;
clean = absl::StripPrefix(clean, prefix);

size_t pos = clean.find('/');
if (pos == string_view::npos) {
Expand Down
12 changes: 12 additions & 0 deletions src/server/detail/snapshot_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class SnapshotStorage {
// Searches for all the relevant snapshot files given the RDB file or DFS summary file path.
io::Result<std::vector<std::string>, GenericError> ExpandSnapshot(const std::string& load_path);

virtual bool IsCloud() const {
return false;
}

protected:
virtual io::Result<std::vector<std::string>, GenericError> ExpandFromPath(
const std::string& path) = 0;
Expand Down Expand Up @@ -94,6 +98,10 @@ class GcsSnapshotStorage : public SnapshotStorage {
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
std::string_view dbfilename) override;

bool IsCloud() const final {
return true;
}

private:
io::Result<std::vector<std::string>, GenericError> ExpandFromPath(const std::string& path) final;

Expand All @@ -117,6 +125,10 @@ class AwsS3SnapshotStorage : public SnapshotStorage {
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
std::string_view dbfilename) override;

bool IsCloud() const final {
return true;
}

private:
io::Result<std::vector<std::string>, GenericError> ExpandFromPath(const std::string& path) final;

Expand Down
23 changes: 19 additions & 4 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ using strings::HumanReadableNumBytes;
namespace {

const auto kRedisVersion = "6.2.11";
constexpr string_view kS3Prefix = "s3://"sv;

using EngineFunc = void (ServerFamily::*)(CmdArgList args, ConnectionContext* cntx);

Expand All @@ -249,8 +248,12 @@ string UnknownCmd(string cmd, CmdArgList args) {
absl::StrJoin(args.begin(), args.end(), ", ", CmdArgListFormatter()));
}

bool IsCloudPath(string_view path) {
return absl::StartsWith(path, kS3Prefix);
bool IsS3Path(string_view path) {
return absl::StartsWith(path, detail::kS3Prefix);
}

bool IsGCSPath(string_view path) {
return absl::StartsWith(path, detail::kGCSPrefix);
}

// Check that if TLS is used at least one form of client authentication is
Expand Down Expand Up @@ -859,7 +862,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
}

string flag_dir = GetFlag(FLAGS_dir);
if (IsCloudPath(flag_dir)) {
if (IsS3Path(flag_dir)) {
#ifdef WITH_AWS
shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); });
snapshot_storage_ = std::make_shared<detail::AwsS3SnapshotStorage>(
Expand All @@ -868,6 +871,14 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
#else
LOG(ERROR) << "Compiled without AWS support";
#endif
} else if (IsGCSPath(flag_dir)) {
auto gcs = std::make_shared<detail::GcsSnapshotStorage>();
auto ec = shard_set->pool()->GetNextProactor()->Await([&] { return gcs->Init(3000); });
if (ec) {
LOG(ERROR) << "Failed to initialize GCS snapshot storage: " << ec.message();
exit(1);
}
snapshot_storage_ = std::move(gcs);
} else if (fq_threadpool_) {
snapshot_storage_ = std::make_shared<detail::FileSnapshotStorage>(fq_threadpool_.get());
} else {
Expand Down Expand Up @@ -1167,6 +1178,8 @@ void ServerFamily::SnapshotScheduling() {

io::Result<size_t> ServerFamily::LoadRdb(const std::string& rdb_file,
LoadExistingKeys existing_keys) {
VLOG(1) << "Loading data from " << rdb_file;

error_code ec;
io::ReadonlyFileOrError res = snapshot_storage_->OpenReadFile(rdb_file);
if (res) {
Expand Down Expand Up @@ -1628,6 +1641,8 @@ GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view bas
"SAVING - can not save database"};
}

VLOG(1) << "Saving snapshot to " << basename;

save_controller_ = make_unique<SaveStagesController>(detail::SaveStagesInputs{
new_version, basename, trans, &service_, fq_threadpool_.get(), snapshot_storage_});

Expand Down

0 comments on commit a03272e

Please sign in to comment.