Skip to content

Commit

Permalink
Export Import sst files (facebook#5495)
Browse files Browse the repository at this point in the history
Summary:
Refresh of the earlier change here - facebook#5135

This is a review request for code change needed for - facebook#3469
"Add support for taking snapshot of a column family and creating column family from a given CF snapshot"

We have an implementation for this that we have been testing internally. We have two new APIs that together provide this functionality.

(1) ExportColumnFamily() - This API is modelled after CreateCheckpoint() as below.
// Exports all live SST files of a specified Column Family onto export_dir,
// returning SST files information in metadata.
// - SST files will be created as hard links when the directory specified
//   is in the same partition as the db directory, copied otherwise.
// - export_dir should not already exist and will be created by this API.
// - Always triggers a flush.
virtual Status ExportColumnFamily(ColumnFamilyHandle* handle,
                                  const std::string& export_dir,
                                  ExportImportFilesMetaData** metadata);

Internally, the API will DisableFileDeletions(), GetColumnFamilyMetaData(), Parse through
metadata, creating links/copies of all the sst files, EnableFileDeletions() and complete the call by
returning the list of file metadata.

(2) CreateColumnFamilyWithImport() - This API is modeled after IngestExternalFile(), but invoked only during a CF creation as below.
// CreateColumnFamilyWithImport() will create a new column family with
// column_family_name and import external SST files specified in metadata into
// this column family.
// (1) External SST files can be created using SstFileWriter.
// (2) External SST files can be exported from a particular column family in
//     an existing DB.
// Option in import_options specifies whether the external files are copied or
// moved (default is copy). When option specifies copy, managing files at
// external_file_path is caller's responsibility. When option specifies a
// move, the call ensures that the specified files at external_file_path are
// deleted on successful return and files are not modified on any error
// return.
// On error return, column family handle returned will be nullptr.
// ColumnFamily will be present on successful return and will not be present
// on error return. ColumnFamily may be present on any crash during this call.
virtual Status CreateColumnFamilyWithImport(
    const ColumnFamilyOptions& options, const std::string& column_family_name,
    const ImportColumnFamilyOptions& import_options,
    const ExportImportFilesMetaData& metadata,
    ColumnFamilyHandle** handle);

Internally, this API creates a new CF, parses all the sst files and adds it to the specified column family, at the same level and with same sequence number as in the metadata. Also performs safety checks with respect to overlaps between the sst files being imported.

If incoming sequence number is higher than current local sequence number, local sequence
number is updated to reflect this.

Note, as the sst files is are being moved across Column Families, Column Family name in sst file
will no longer match the actual column family on destination DB. The API does not modify Column
Family name or id in the sst files being imported.
Pull Request resolved: facebook#5495

Differential Revision: D16018881

fbshipit-source-id: 9ae2251025d5916d35a9fc4ea4d6707f6be16ff9
  • Loading branch information
Venki Pallipadi authored and facebook-github-bot committed Jul 17, 2019
1 parent 356eb88 commit 52632f1
Show file tree
Hide file tree
Showing 20 changed files with 1,453 additions and 1 deletion.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ set(SOURCES
db/flush_job.cc
db/flush_scheduler.cc
db/forward_iterator.cc
db/import_column_family_job.cc
db/internal_stats.cc
db/logs_with_prep_tracker.cc
db/log_reader.cc
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ TESTS = \
plain_table_db_test \
comparator_db_test \
external_sst_file_test \
import_column_family_test \
prefix_test \
skiplist_test \
write_buffer_manager_test \
Expand Down Expand Up @@ -577,6 +578,7 @@ PARALLEL_TEST = \
db_universal_compaction_test \
db_wal_test \
external_sst_file_test \
import_column_family_test \
fault_injection_test \
inlineskiplist_test \
manual_compaction_test \
Expand Down Expand Up @@ -1274,6 +1276,9 @@ external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.
external_sst_file_test: db/external_sst_file_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

import_column_family_test: db/import_column_family_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

Expand Down
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ cpp_library(
"db/flush_job.cc",
"db/flush_scheduler.cc",
"db/forward_iterator.cc",
"db/import_column_family_job.cc",
"db/internal_stats.cc",
"db/log_reader.cc",
"db/log_writer.cc",
Expand Down
9 changes: 9 additions & 0 deletions db/compacted_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ class CompactedDBImpl : public DBImpl {
const IngestExternalFileOptions& /*ingestion_options*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const ExportImportFilesMetaData& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}

private:
friend class DB;
Expand Down
122 changes: 122 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/import_column_family_job.h"
#include "db/flush_job.h"
#include "db/forward_iterator.h"
#include "db/job_context.h"
Expand Down Expand Up @@ -3894,6 +3895,127 @@ Status DBImpl::IngestExternalFiles(
return status;
}

Status DBImpl::CreateColumnFamilyWithImport(
const ColumnFamilyOptions& options, const std::string& column_family_name,
const ImportColumnFamilyOptions& import_options,
const ExportImportFilesMetaData& metadata,
ColumnFamilyHandle** handle) {
assert(handle != nullptr);
assert(*handle == nullptr);
std::string cf_comparator_name = options.comparator->Name();
if (cf_comparator_name != metadata.db_comparator_name) {
return Status::InvalidArgument("Comparator name mismatch");
}

// Create column family.
auto status = CreateColumnFamily(options, column_family_name, handle);
if (!status.ok()) {
return status;
}

// Import sst files from metadata.
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(*handle);
auto cfd = cfh->cfd();
ImportColumnFamilyJob import_job(env_, versions_.get(), cfd,
immutable_db_options_, env_options_,
import_options, metadata.files);

SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
VersionEdit dummy_edit;
uint64_t next_file_number = 0;
std::list<uint64_t>::iterator pending_output_elem;
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);
if (error_handler_.IsDBStopped()) {
// Don't import files when there is a bg_error
status = error_handler_.GetBGError();
}

// Make sure that bg cleanup wont delete the files that we are importing
pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();

if (status.ok()) {
// If crash happen after a hard link established, Recover function may
// reuse the file number that has already assigned to the internal file,
// and this will overwrite the external file. To protect the external
// file, we have to make sure the file number will never being reused.
next_file_number =
versions_->FetchAddFileNumber(metadata.files.size());
auto cf_options = cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
}
}
}
dummy_sv_ctx.Clean();

if (status.ok()) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
status = import_job.Prepare(next_file_number, sv);
CleanupSuperVersion(sv);
}

if (status.ok()) {
SuperVersionContext sv_context(true /*create_superversion*/);
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);

// Stop writes to the DB by entering both write threads
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}

num_running_ingest_file_++;
assert(!cfd->IsDropped());
status = import_job.Run();

// Install job edit [Mutex will be unlocked here]
if (status.ok()) {
auto cf_options = cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
}
}

// Resume writes to the DB
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);

num_running_ingest_file_--;
if (num_running_ingest_file_ == 0) {
bg_cv_.SignalAll();
}
}
// mutex_ is unlocked here

sv_context.Clean();
}

{
InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
}

import_job.Cleanup(status);
if (!status.ok()) {
DropColumnFamily(*handle);
DestroyColumnFamilyHandle(*handle);
*handle = nullptr;
}
return status;
}

Status DBImpl::VerifyChecksum() {
Status s;
std::vector<ColumnFamilyData*> cfd_list;
Expand Down
11 changes: 10 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
#include "db/import_column_family_job.h"
#include "db/internal_stats.h"
#include "db/log_writer.h"
#include "db/logs_with_prep_tracker.h"
Expand Down Expand Up @@ -356,6 +357,13 @@ class DBImpl : public DB {
virtual Status IngestExternalFiles(
const std::vector<IngestExternalFileArg>& args) override;

using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& options, const std::string& column_family_name,
const ImportColumnFamilyOptions& import_options,
const ExportImportFilesMetaData& metadata,
ColumnFamilyHandle** handle) override;

virtual Status VerifyChecksum() override;

using DB::StartTrace;
Expand Down Expand Up @@ -1803,7 +1811,8 @@ class DBImpl : public DB {

std::string db_absolute_path_;

// Number of running IngestExternalFile() calls.
// Number of running IngestExternalFile() or CreateColumnFamilyWithImport()
// calls.
// REQUIRES: mutex held
int num_running_ingest_file_;

Expand Down
10 changes: 10 additions & 0 deletions db/db_impl/db_impl_readonly.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ class DBImplReadOnly : public DBImpl {
return Status::NotSupported("Not supported operation in read only mode.");
}

using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const ExportImportFilesMetaData& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}

private:
friend class DB;

Expand Down
10 changes: 10 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2492,6 +2492,16 @@ class ModelDB : public DB {
return Status::NotSupported("Not implemented");
}

using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const ExportImportFilesMetaData& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not implemented.");
}

Status VerifyChecksum() override {
return Status::NotSupported("Not implemented.");
}
Expand Down
Loading

0 comments on commit 52632f1

Please sign in to comment.