Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export Import sst files #5135

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ set(SOURCES
db/flush_job.cc
db/flush_scheduler.cc
db/forward_iterator.cc
db/import_column_family_job.cc
db/internal_stats.cc
db/logs_with_prep_tracker.cc
db/log_reader.cc
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ TESTS = \
plain_table_db_test \
comparator_db_test \
external_sst_file_test \
import_column_family_test \
prefix_test \
skiplist_test \
write_buffer_manager_test \
Expand Down Expand Up @@ -566,6 +567,7 @@ PARALLEL_TEST = \
db_universal_compaction_test \
db_wal_test \
external_sst_file_test \
import_column_family_test \
fault_injection_test \
inlineskiplist_test \
manual_compaction_test \
Expand Down Expand Up @@ -1246,6 +1248,9 @@ external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.
external_sst_file_test: db/external_sst_file_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

import_column_family_test: db/import_column_family_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

Expand Down
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ cpp_library(
"db/flush_job.cc",
"db/flush_scheduler.cc",
"db/forward_iterator.cc",
"db/import_column_family_job.cc",
"db/internal_stats.cc",
"db/log_reader.cc",
"db/log_writer.cc",
Expand Down
9 changes: 9 additions & 0 deletions db/compacted_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ class CompactedDBImpl : public DBImpl {
const IngestExternalFileOptions& /*ingestion_options*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const std::vector<LiveFileMetaData>& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}

private:
friend class DB;
Expand Down
114 changes: 114 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/import_column_family_job.h"
#include "db/flush_job.h"
#include "db/forward_iterator.h"
#include "db/job_context.h"
Expand Down Expand Up @@ -3201,6 +3202,119 @@ Status DBImpl::IngestExternalFile(
return status;
}

Status DBImpl::CreateColumnFamilyWithImport(
const ColumnFamilyOptions& options, const std::string& column_family_name,
const ImportColumnFamilyOptions& import_options,
const std::vector<LiveFileMetaData>& metadata,
ColumnFamilyHandle** handle) {
// Create column family.
auto status = CreateColumnFamily(options, column_family_name, handle);
if (!status.ok()) {
return status;
}

// Import sst files from metadata.
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(*handle);
auto cfd = cfh->cfd();
ImportColumnFamilyJob import_job(env_, versions_.get(), cfd,
immutable_db_options_, env_options_,
import_options, metadata);

SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
VersionEdit dummy_edit;
uint64_t next_file_number = 0;
std::list<uint64_t>::iterator pending_output_elem;
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);
if (error_handler_.IsDBStopped()) {
// Don't import files when there is a bg_error
status = error_handler_.GetBGError();
}

// Make sure that bg cleanup wont delete the files that we are importing
pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();

if (status.ok()) {
// If crash happen after a hard link established, Recover function may
// reuse the file number that has already assigned to the internal file,
// and this will overwrite the external file. To protect the external
// file, we have to make sure the file number will never being reused.
next_file_number = versions_->FetchAddFileNumber(metadata.size());
auto cf_options = cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
}
}
}
dummy_sv_ctx.Clean();

if (status.ok()) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
status = import_job.Prepare(next_file_number, sv);
CleanupSuperVersion(sv);
}

if (status.ok()) {
SuperVersionContext sv_context(true /*create_superversion*/);
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);

// Stop writes to the DB by entering both write threads
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}

num_running_ingest_file_++;
assert(!cfd->IsDropped());
status = import_job.Run();

// Install job edit [Mutex will be unlocked here]
if (status.ok()) {
auto cf_options = cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
cfd, &sv_context, *cf_options,
FlushReason::kExternalFileIngestion);
}
}

// Resume writes to the DB
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);

num_running_ingest_file_--;
if (num_running_ingest_file_ == 0) {
bg_cv_.SignalAll();
}
}
// mutex_ is unlocked here

sv_context.Clean();
}

{
InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
}

import_job.Cleanup(status);
if (!status.ok()) {
DropColumnFamily(*handle);
}
return status;
}

Status DBImpl::VerifyChecksum() {
Status s;
std::vector<ColumnFamilyData*> cfd_list;
Expand Down
13 changes: 11 additions & 2 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/import_column_family_job.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
Expand Down Expand Up @@ -338,6 +339,14 @@ class DBImpl : public DB {
const std::vector<std::string>& external_files,
const IngestExternalFileOptions& ingestion_options) override;

using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& options,
const std::string& column_family_name,
const ImportColumnFamilyOptions& import_options,
const std::vector<LiveFileMetaData>& metadata,
ColumnFamilyHandle** handle) override;

virtual Status VerifyChecksum() override;

using DB::StartTrace;
Expand Down Expand Up @@ -1477,8 +1486,8 @@ class DBImpl : public DB {
// Additonal options for compaction and flush
EnvOptions env_options_for_compaction_;

// Number of running IngestExternalFile() calls.
// REQUIRES: mutex held
// Number of running IngestExternalFile() or CreateColumnFamilyWithImport()
// calls. REQUIRES: mutex held
int num_running_ingest_file_;

#ifndef ROCKSDB_LITE
Expand Down
10 changes: 10 additions & 0 deletions db/db_impl_readonly.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ class DBImplReadOnly : public DBImpl {
return Status::NotSupported("Not supported operation in read only mode.");
}

using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const std::vector<LiveFileMetaData>& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}

private:
friend class DB;

Expand Down
10 changes: 10 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2466,6 +2466,16 @@ class ModelDB : public DB {
return Status::NotSupported("Not implemented.");
}

using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const std::vector<LiveFileMetaData>& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not implemented.");
}

virtual Status VerifyChecksum() override {
return Status::NotSupported("Not implemented.");
}
Expand Down
Loading