diff --git a/CMakeLists.txt b/CMakeLists.txt index 98e2e19730b..0c4a68abbe0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -494,6 +494,7 @@ set(SOURCES db/flush_job.cc db/flush_scheduler.cc db/forward_iterator.cc + db/import_column_family_job.cc db/internal_stats.cc db/logs_with_prep_tracker.cc db/log_reader.cc diff --git a/Makefile b/Makefile index 09e2cd3ea2f..d96dac3343a 100644 --- a/Makefile +++ b/Makefile @@ -487,6 +487,7 @@ TESTS = \ plain_table_db_test \ comparator_db_test \ external_sst_file_test \ + import_column_family_test \ prefix_test \ skiplist_test \ write_buffer_manager_test \ @@ -566,6 +567,7 @@ PARALLEL_TEST = \ db_universal_compaction_test \ db_wal_test \ external_sst_file_test \ + import_column_family_test \ fault_injection_test \ inlineskiplist_test \ manual_compaction_test \ @@ -1246,6 +1248,9 @@ external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util. external_sst_file_test: db/external_sst_file_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +import_column_family_test: db/import_column_family_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 43f8bd5b216..5e63fcf47a2 100644 --- a/TARGETS +++ b/TARGETS @@ -112,6 +112,7 @@ cpp_library( "db/flush_job.cc", "db/flush_scheduler.cc", "db/forward_iterator.cc", + "db/import_column_family_job.cc", "db/internal_stats.cc", "db/log_reader.cc", "db/log_writer.cc", diff --git a/db/compacted_db_impl.h b/db/compacted_db_impl.h index 5c574b4b9a5..ba7b1984d2b 100644 --- a/db/compacted_db_impl.h +++ b/db/compacted_db_impl.h @@ -85,6 +85,15 @@ class CompactedDBImpl : public DBImpl { const IngestExternalFileOptions& /*ingestion_options*/) override { return Status::NotSupported("Not supported in compacted db mode."); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& /*options*/, + const std::string& /*column_family_name*/, + const ImportColumnFamilyOptions& /*import_options*/, + const std::vector& /*metadata*/, + ColumnFamilyHandle** /*handle*/) override { + return Status::NotSupported("Not supported in compacted db mode."); + } private: friend class DB; diff --git a/db/db_impl.cc b/db/db_impl.cc index 3f6e4467600..b008c21c9dc 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -35,6 +35,7 @@ #include "db/error_handler.h" #include "db/event_helpers.h" #include "db/external_sst_file_ingestion_job.h" +#include "db/import_column_family_job.h" #include "db/flush_job.h" #include "db/forward_iterator.h" #include "db/job_context.h" @@ -3201,6 +3202,119 @@ Status DBImpl::IngestExternalFile( return status; } +Status DBImpl::CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const std::vector& metadata, + ColumnFamilyHandle** handle) { + // Create column family. + auto status = CreateColumnFamily(options, column_family_name, handle); + if (!status.ok()) { + return status; + } + + // Import sst files from metadata. + auto cfh = reinterpret_cast(*handle); + auto cfd = cfh->cfd(); + ImportColumnFamilyJob import_job(env_, versions_.get(), cfd, + immutable_db_options_, env_options_, + import_options, metadata); + + SuperVersionContext dummy_sv_ctx(/* create_superversion */ true); + VersionEdit dummy_edit; + uint64_t next_file_number = 0; + std::list::iterator pending_output_elem; + { + // Lock db mutex + InstrumentedMutexLock l(&mutex_); + if (error_handler_.IsDBStopped()) { + // Don't import files when there is a bg_error + status = error_handler_.GetBGError(); + } + + // Make sure that bg cleanup wont delete the files that we are importing + pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); + + if (status.ok()) { + // If crash happen after a hard link established, Recover function may + // reuse the file number that has already assigned to the internal file, + // and this will overwrite the external file. To protect the external + // file, we have to make sure the file number will never being reused. + next_file_number = versions_->FetchAddFileNumber(metadata.size()); + auto cf_options = cfd->GetLatestMutableCFOptions(); + status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_, + directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options); + } + } + } + dummy_sv_ctx.Clean(); + + if (status.ok()) { + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + status = import_job.Prepare(next_file_number, sv); + CleanupSuperVersion(sv); + } + + if (status.ok()) { + SuperVersionContext sv_context(true /*create_superversion*/); + { + // Lock db mutex + InstrumentedMutexLock l(&mutex_); + + // Stop writes to the DB by entering both write threads + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + + num_running_ingest_file_++; + assert(!cfd->IsDropped()); + status = import_job.Run(); + + // Install job edit [Mutex will be unlocked here] + if (status.ok()) { + auto cf_options = cfd->GetLatestMutableCFOptions(); + status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(), + &mutex_, directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionAndScheduleWork( + cfd, &sv_context, *cf_options, + FlushReason::kExternalFileIngestion); + } + } + + // Resume writes to the DB + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + write_thread_.ExitUnbatched(&w); + + num_running_ingest_file_--; + if (num_running_ingest_file_ == 0) { + bg_cv_.SignalAll(); + } + } + // mutex_ is unlocked here + + sv_context.Clean(); + } + + { + InstrumentedMutexLock l(&mutex_); + ReleaseFileNumberFromPendingOutputs(pending_output_elem); + } + + import_job.Cleanup(status); + if (!status.ok()) { + DropColumnFamily(*handle); + } + return status; +} + Status DBImpl::VerifyChecksum() { Status s; std::vector cfd_list; diff --git a/db/db_impl.h b/db/db_impl.h index 58fcacc67e2..60217a5b7e3 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -25,6 +25,7 @@ #include "db/error_handler.h" #include "db/event_helpers.h" #include "db/external_sst_file_ingestion_job.h" +#include "db/import_column_family_job.h" #include "db/flush_job.h" #include "db/flush_scheduler.h" #include "db/internal_stats.h" @@ -338,6 +339,14 @@ class DBImpl : public DB { const std::vector& external_files, const IngestExternalFileOptions& ingestion_options) override; + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, + const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const std::vector& metadata, + ColumnFamilyHandle** handle) override; + virtual Status VerifyChecksum() override; using DB::StartTrace; @@ -1477,8 +1486,8 @@ class DBImpl : public DB { // Additonal options for compaction and flush EnvOptions env_options_for_compaction_; - // Number of running IngestExternalFile() calls. - // REQUIRES: mutex held + // Number of running IngestExternalFile() or CreateColumnFamilyWithImport() + // calls. REQUIRES: mutex held int num_running_ingest_file_; #ifndef ROCKSDB_LITE diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 2d77dbac0e7..0d19b7f07e2 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -115,6 +115,16 @@ class DBImplReadOnly : public DBImpl { return Status::NotSupported("Not supported operation in read only mode."); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& /*options*/, + const std::string& /*column_family_name*/, + const ImportColumnFamilyOptions& /*import_options*/, + const std::vector& /*metadata*/, + ColumnFamilyHandle** /*handle*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + private: friend class DB; diff --git a/db/db_test.cc b/db/db_test.cc index 5a9c394692b..df29f565a06 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2466,6 +2466,16 @@ class ModelDB : public DB { return Status::NotSupported("Not implemented."); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& /*options*/, + const std::string& /*column_family_name*/, + const ImportColumnFamilyOptions& /*import_options*/, + const std::vector& /*metadata*/, + ColumnFamilyHandle** /*handle*/) override { + return Status::NotSupported("Not implemented."); + } + virtual Status VerifyChecksum() override { return Status::NotSupported("Not implemented."); } diff --git a/db/import_column_family_job.cc b/db/import_column_family_job.cc new file mode 100644 index 00000000000..e467ffe2430 --- /dev/null +++ b/db/import_column_family_job.cc @@ -0,0 +1,260 @@ +#ifndef ROCKSDB_LITE + +#include "db/import_column_family_job.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include +#include +#include +#include + +#include "db/version_edit.h" +#include "table/merging_iterator.h" +#include "table/scoped_arena_iterator.h" +#include "table/sst_file_writer_collectors.h" +#include "table/table_builder.h" +#include "util/file_reader_writer.h" +#include "util/file_util.h" +#include "util/stop_watch.h" + +namespace rocksdb { + +Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number, + SuperVersion* sv) { + Status status; + + // Read the information of files we are importing + for (const auto& file_metadata : metadata_) { + const auto file_path = file_metadata.db_path + "/" + file_metadata.name; + IngestedFileInfo file_to_import; + status = GetIngestedFileInfo(file_path, &file_to_import, sv); + if (!status.ok()) { + return status; + } + files_to_import_.push_back(file_to_import); + } + + const auto ucmp = cfd_->internal_comparator().user_comparator(); + auto num_files = files_to_import_.size(); + if (num_files == 0) { + return Status::InvalidArgument("The list of files is empty"); + } else if (num_files > 1) { + // Verify that passed files don't have overlapping ranges in any particular + // level. + int min_level = 1; // Check for overlaps in Level 1 and above. + int max_level = -1; + for (const auto& file_metadata : metadata_) { + if (file_metadata.level > max_level) { + max_level = file_metadata.level; + } + } + for (int level = min_level; level <= max_level; ++level) { + autovector sorted_files; + for (size_t i = 0; i < num_files; i++) { + if (metadata_[i].level == level) { + sorted_files.push_back(&files_to_import_[i]); + } + } + + std::sort(sorted_files.begin(), sorted_files.end(), + [&ucmp](const IngestedFileInfo* info1, + const IngestedFileInfo* info2) { + return ucmp->Compare(info1->smallest_user_key, + info2->smallest_user_key) < 0; + }); + + for (size_t i = 0; i < sorted_files.size() - 1; i++) { + if (ucmp->Compare(sorted_files[i]->largest_user_key, + sorted_files[i + 1]->smallest_user_key) >= 0) { + return Status::InvalidArgument("Files have overlapping ranges"); + } + } + } + } + + for (const auto& f : files_to_import_) { + if (f.num_entries == 0) { + return Status::InvalidArgument("File contain no entries"); + } + + if (!f.smallest_internal_key().Valid() || + !f.largest_internal_key().Valid()) { + return Status::Corruption("File has corrupted keys"); + } + } + + // Copy/Move external files into DB + auto hardlink_files = import_options_.move_files; + for (auto& f : files_to_import_) { + f.fd = FileDescriptor(next_file_number++, 0, f.file_size); + + const auto path_outside_db = f.external_file_path; + const auto path_inside_db = TableFileName( + cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId()); + + if (hardlink_files) { + status = env_->LinkFile(path_outside_db, path_inside_db); + if (status.IsNotSupported()) { + // Original file is on a different FS, use copy instead of hard linking + hardlink_files = false; + } + } + if (!hardlink_files) { + status = CopyFile(env_, path_outside_db, path_inside_db, 0, + db_options_.use_fsync); + } + if (!status.ok()) { + break; + } + f.copy_file = !hardlink_files; + f.internal_file_path = path_inside_db; + } + + if (!status.ok()) { + // We failed, remove all files that we copied into the db + for (const auto& f : files_to_import_) { + if (f.internal_file_path.empty()) { + break; + } + const auto s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } + + return status; +} + +// REQUIRES: we have become the only writer by entering both write_thread_ and +// nonmem_write_thread_ +Status ImportColumnFamilyJob::Run() { + Status status; + edit_.SetColumnFamily(cfd_->GetID()); + + for (size_t i = 0; i < files_to_import_.size(); ++i) { + const auto& f = files_to_import_[i]; + const auto& file_metadata = metadata_[i]; + edit_.AddFile(file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(), + f.fd.GetFileSize(), f.smallest_internal_key(), + f.largest_internal_key(), file_metadata.smallest_seqno, + file_metadata.largest_seqno, false); + + // If incoming sequence number is higher, update local sequence number. + if (file_metadata.largest_seqno > versions_->LastSequence()) { + versions_->SetLastAllocatedSequence(file_metadata.largest_seqno); + versions_->SetLastPublishedSequence(file_metadata.largest_seqno); + versions_->SetLastSequence(file_metadata.largest_seqno); + } + } + + return status; +} + +void ImportColumnFamilyJob::Cleanup(const Status& status) { + if (!status.ok()) { + // We failed to add files to the database remove all the files we copied. + for (const auto& f : files_to_import_) { + const auto s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } else if (status.ok() && import_options_.move_files) { + // The files were moved and added successfully, remove original file links + for (IngestedFileInfo& f : files_to_import_) { + const auto s = env_->DeleteFile(f.external_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN( + db_options_.info_log, + "%s was added to DB successfully but failed to remove original " + "file link : %s", + f.external_file_path.c_str(), s.ToString().c_str()); + } + } + } +} + +Status ImportColumnFamilyJob::GetIngestedFileInfo( + const std::string& external_file, IngestedFileInfo* file_to_import, + SuperVersion* sv) { + file_to_import->external_file_path = external_file; + + // Get external file size + auto status = env_->GetFileSize(external_file, &file_to_import->file_size); + if (!status.ok()) { + return status; + } + + // Create TableReader for external file + std::unique_ptr table_reader; + std::unique_ptr sst_file; + std::unique_ptr sst_file_reader; + + status = env_->NewRandomAccessFile(external_file, &sst_file, env_options_); + if (!status.ok()) { + return status; + } + sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file), + external_file)); + + status = cfd_->ioptions()->table_factory->NewTableReader( + TableReaderOptions(*cfd_->ioptions(), + sv->mutable_cf_options.prefix_extractor.get(), + env_options_, cfd_->internal_comparator()), + std::move(sst_file_reader), file_to_import->file_size, &table_reader); + if (!status.ok()) { + return status; + } + + // Get the external file properties + auto props = table_reader->GetTableProperties(); + + // Set original_seqno to 0. + file_to_import->original_seqno = 0; + + // Get number of entries in table + file_to_import->num_entries = props->num_entries; + + ParsedInternalKey key; + ReadOptions ro; + // During reading the external file we can cache blocks that we read into + // the block cache, if we later change the global seqno of this file, we will + // have block in cache that will include keys with wrong seqno. + // We need to disable fill_cache so that we read from the file without + // updating the block cache. + ro.fill_cache = false; + std::unique_ptr iter(table_reader->NewIterator( + ro, sv->mutable_cf_options.prefix_extractor.get())); + + // Get first (smallest) key from file + iter->SeekToFirst(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + file_to_import->smallest_user_key = key.user_key.ToString(); + + // Get last (largest) key from file + iter->SeekToLast(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + file_to_import->largest_user_key = key.user_key.ToString(); + + file_to_import->cf_id = static_cast(props->column_family_id); + + file_to_import->table_properties = *props; + + return status; +} + +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/db/import_column_family_job.h b/db/import_column_family_job.h new file mode 100644 index 00000000000..5b8577df1d5 --- /dev/null +++ b/db/import_column_family_job.h @@ -0,0 +1,70 @@ +#pragma once +#include +#include +#include + +#include "db/column_family.h" +#include "db/dbformat.h" +#include "db/external_sst_file_ingestion_job.h" +#include "db/snapshot_impl.h" +#include "options/db_options.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/metadata.h" +#include "rocksdb/sst_file_writer.h" +#include "util/autovector.h" + +namespace rocksdb { + +// Imports a set of sst files as is into a new column family. Logic is similar +// to ExternalSstFileIngestionJob. +class ImportColumnFamilyJob { + public: + ImportColumnFamilyJob( + Env* env, VersionSet* versions, ColumnFamilyData* cfd, + const ImmutableDBOptions& db_options, const EnvOptions& env_options, + const ImportColumnFamilyOptions& import_options, + const std::vector& metadata) + : env_(env), + versions_(versions), + cfd_(cfd), + db_options_(db_options), + env_options_(env_options), + import_options_(import_options), + metadata_(metadata) {} + + // Prepare the job by copying external files into the DB. + Status Prepare(uint64_t next_file_number, SuperVersion* sv); + + // Will execute the import job and prepare edit() to be applied. + // REQUIRES: Mutex held + Status Run(); + + // Cleanup after successful/failed job + void Cleanup(const Status& status); + + VersionEdit* edit() { return &edit_; } + + const autovector& files_to_import() const { + return files_to_import_; + } + + private: + // Open the external file and populate `file_to_import` with all the + // external information we need to import this file. + Status GetIngestedFileInfo(const std::string& external_file, + IngestedFileInfo* file_to_import, + SuperVersion* sv); + + Env* env_; + VersionSet* versions_; + ColumnFamilyData* cfd_; + const ImmutableDBOptions& db_options_; + const EnvOptions& env_options_; + autovector files_to_import_; + VersionEdit edit_; + const ImportColumnFamilyOptions& import_options_; + std::vector metadata_; +}; + +} // namespace rocksdb diff --git a/db/import_column_family_test.cc b/db/import_column_family_test.cc new file mode 100644 index 00000000000..7e2a31473e9 --- /dev/null +++ b/db/import_column_family_test.cc @@ -0,0 +1,487 @@ +#ifndef ROCKSDB_LITE + +#include +#include "db/db_test_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/sst_file_writer.h" +#include "util/testutil.h" + +namespace rocksdb { + +class ImportColumnFamilyTest : public DBTestBase { + public: + ImportColumnFamilyTest() : DBTestBase("/import_column_family_test") { + sst_files_dir_ = dbname_ + "/sst_files/"; + DestroyAndRecreateExternalSSTFilesDir(); + export_files_dir_ = test::TmpDir(env_) + "/export"; + } + + void DestroyAndRecreateExternalSSTFilesDir() { + test::DestroyDir(env_, sst_files_dir_); + env_->CreateDir(sst_files_dir_); + test::DestroyDir(env_, export_files_dir_); + } + + LiveFileMetaData LiveFileMetaDataInit(std::string name, + std::string path, + int level, + SequenceNumber smallest_seqno, + SequenceNumber largest_seqno) { + LiveFileMetaData metadata; + metadata.name = name; + metadata.db_path = path; + metadata.smallest_seqno = smallest_seqno; + metadata.largest_seqno = largest_seqno; + metadata.level = level; + return metadata; + } + + ~ImportColumnFamilyTest() { + test::DestroyDir(env_, sst_files_dir_); + test::DestroyDir(env_, export_files_dir_); + } + + protected: + std::string sst_files_dir_; + std::string export_files_dir_; +}; + +TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFiles) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + SstFileWriter sfw_unknown(EnvOptions(), options); + + // cf1.sst + const std::string cf1_sst_name = "cf1.sst"; + const std::string cf1_sst = sst_files_dir_ + cf1_sst_name; + ASSERT_OK(sfw_cf1.Open(cf1_sst)); + ASSERT_OK(sfw_cf1.Put("K1", "V1")); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Finish()); + + // cf_unknown.sst + const std::string unknown_sst_name = "cf_unknown.sst"; + const std::string unknown_sst = sst_files_dir_ + unknown_sst_name; + ASSERT_OK(sfw_unknown.Open(unknown_sst)); + ASSERT_OK(sfw_unknown.Put("K3", "V1")); + ASSERT_OK(sfw_unknown.Put("K4", "V2")); + ASSERT_OK(sfw_unknown.Finish()); + + { + // Import sst file corresponding to cf1 onto cf2 and verify. + std::vector metadata; + metadata.push_back( + LiveFileMetaDataInit(cf1_sst_name, sst_files_dir_, 0, 10, 19)); + + ColumnFamilyHandle* cfh2; + ASSERT_OK(db_->CreateColumnFamilyWithImport( + options, "toto", ImportColumnFamilyOptions(), metadata, &cfh2)); + + std::string value2; + db_->Get(ReadOptions(), cfh2, "K1", &value2); + ASSERT_EQ(value2, "V1"); + db_->Get(ReadOptions(), cfh2, "K2", &value2); + ASSERT_EQ(value2, "V2"); + ASSERT_OK(db_->DropColumnFamily(cfh2)); + } + + { + // Import sst file corresponding to unknown cf onto cf2 and verify. + std::vector metadata; + metadata.push_back( + LiveFileMetaDataInit(unknown_sst_name, sst_files_dir_, 0, 20, 29)); + + ColumnFamilyHandle* cfh2; + ASSERT_OK(db_->CreateColumnFamilyWithImport( + options, "yoyo", ImportColumnFamilyOptions(), metadata, &cfh2)); + + std::string value2; + db_->Get(ReadOptions(), cfh2, "K3", &value2); + ASSERT_EQ(value2, "V1"); + db_->Get(ReadOptions(), cfh2, "K4", &value2); + ASSERT_EQ(value2, "V2"); + ASSERT_OK(db_->DropColumnFamily(cfh2)); + } +} + +TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFilesWithOverlap) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + + // file3.sst + const std::string file3_sst_name = "file3.sst"; + const std::string file3_sst = sst_files_dir_ + file3_sst_name; + ASSERT_OK(sfw_cf1.Open(file3_sst)); + for (int i = 0; i < 100; ++i) { + sfw_cf1.Put(Key(i), Key(i) + "_val"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file2.sst + const std::string file2_sst_name = "file2.sst"; + const std::string file2_sst = sst_files_dir_ + file2_sst_name; + ASSERT_OK(sfw_cf1.Open(file2_sst)); + for (int i = 0; i < 100; i += 2) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite1"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file1a.sst + const std::string file1a_sst_name = "file1a.sst"; + const std::string file1a_sst = sst_files_dir_ + file1a_sst_name; + ASSERT_OK(sfw_cf1.Open(file1a_sst)); + for (int i = 0; i < 52; i += 4) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite2"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file1b.sst + const std::string file1b_sst_name = "file1b.sst"; + const std::string file1b_sst = sst_files_dir_ + file1b_sst_name; + ASSERT_OK(sfw_cf1.Open(file1b_sst)); + for (int i = 52; i < 100; i += 4) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite2"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file0a.sst + const std::string file0a_sst_name = "file0a.sst"; + const std::string file0a_sst = sst_files_dir_ + file0a_sst_name; + ASSERT_OK(sfw_cf1.Open(file0a_sst)); + for (int i = 0; i < 100; i += 16) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite3"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // file0b.sst + const std::string file0b_sst_name = "file0b.sst"; + const std::string file0b_sst = sst_files_dir_ + file0b_sst_name; + ASSERT_OK(sfw_cf1.Open(file0b_sst)); + for (int i = 0; i < 100; i += 16) { + sfw_cf1.Put(Key(i), Key(i) + "_overwrite3"); + } + ASSERT_OK(sfw_cf1.Finish()); + + // Import sst files and verify. + std::vector metadata; + metadata.push_back( + LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 3, 10, 19)); + metadata.push_back( + LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 2, 20, 29)); + metadata.push_back( + LiveFileMetaDataInit(file1a_sst_name, sst_files_dir_, 1, 30, 34)); + metadata.push_back( + LiveFileMetaDataInit(file1b_sst_name, sst_files_dir_, 1, 35, 39)); + metadata.push_back( + LiveFileMetaDataInit(file0a_sst_name, sst_files_dir_, 0, 40, 49)); + metadata.push_back( + LiveFileMetaDataInit(file0b_sst_name, sst_files_dir_, 0, 50, 59)); + + ColumnFamilyHandle* cfh2; + ASSERT_OK(db_->CreateColumnFamilyWithImport( + options, "toto", ImportColumnFamilyOptions(), metadata, &cfh2)); + + for (int i = 0; i < 100; i++) { + std::string value2; + db_->Get(ReadOptions(), cfh2, Key(i), &value2); + if (i % 16 == 0) { + ASSERT_EQ(value2, Key(i) + "_overwrite3"); + } else if (i % 4 == 0) { + ASSERT_EQ(value2, Key(i) + "_overwrite2"); + } else if (i % 2 == 0) { + ASSERT_EQ(value2, Key(i) + "_overwrite1"); + } else { + ASSERT_EQ(value2, Key(i) + "_val"); + } + } + + for (int i = 0; i < 100; i += 5) { + ASSERT_OK(db_->Put(WriteOptions(), cfh2, Key(i), Key(i) + "_overwrite4")); + } + + // Flush and check again. + ASSERT_OK(db_->Flush(FlushOptions(), cfh2)); + for (int i = 0; i < 100; i++) { + std::string value2; + db_->Get(ReadOptions(), cfh2, Key(i), &value2); + if (i % 5 == 0) { + ASSERT_EQ(value2, Key(i) + "_overwrite4"); + } else if (i % 16 == 0) { + ASSERT_EQ(value2, Key(i) + "_overwrite3"); + } else if (i % 4 == 0) { + ASSERT_EQ(value2, Key(i) + "_overwrite2"); + } else if (i % 2 == 0) { + ASSERT_EQ(value2, Key(i) + "_overwrite1"); + } else { + ASSERT_EQ(value2, Key(i) + "_val"); + } + } + + // Compact and check again. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), cfh2, nullptr, nullptr)); + for (int i = 0; i < 100; i++) { + std::string value2; + db_->Get(ReadOptions(), cfh2, Key(i), &value2); + if (i % 5 == 0) { + ASSERT_EQ(value2, Key(i) + "_overwrite4"); + } else if (i % 16 == 0) { + ASSERT_EQ(value2, Key(i) + "_overwrite3"); + } else if (i % 4 == 0) { + ASSERT_EQ(value2, Key(i) + "_overwrite2"); + } else if (i % 2 == 0) { + ASSERT_EQ(value2, Key(i) + "_overwrite1"); + } else { + ASSERT_EQ(value2, Key(i) + "_val"); + } + } + + ASSERT_OK(db_->DropColumnFamily(cfh2)); +} + +TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherCF) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_val"); + } + ASSERT_OK(Flush(1)); + + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr)); + + // Overwrite the value in the same set of keys. + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_overwrite"); + } + + // Flush to create L0 file. + ASSERT_OK(Flush(1)); + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_overwrite2"); + } + + // Flush again to create another L0 file. It should have higher sequencer. + ASSERT_OK(Flush(1)); + + std::vector metadata; + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], &metadata, + export_files_dir_)); + + ColumnFamilyHandle* cfh2; + ImportColumnFamilyOptions import_options; + import_options.move_files = false; + ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "toto", import_options, + metadata, &cfh2)); + + ColumnFamilyHandle* cfh3; + import_options.move_files = true; + ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "yoyo", import_options, + metadata, &cfh3)); + + std::string value2, value3; + + for (int i = 0; i < 100; ++i) { + db_->Get(ReadOptions(), cfh2, Key(i), &value2); + db_->Get(ReadOptions(), cfh3, Key(i), &value3); + ASSERT_EQ(Get(1, Key(i)), value2); + ASSERT_EQ(Get(1, Key(i)), value3); + } + + // Modify keys in cf2 and verify. + for (int i = 0; i < 25; i++) { + ASSERT_OK(db_->Delete(WriteOptions(), cfh2, Key(i))); + } + + for (int i = 25; i < 50; i++) { + ASSERT_OK(db_->Put(WriteOptions(), cfh2, Key(i), Key(i) + "_overwrite3")); + } + + for (int i = 0; i < 25; ++i) { + ASSERT_TRUE(db_->Get(ReadOptions(), cfh2, Key(i), &value2).IsNotFound()); + } + for (int i = 25; i < 50; ++i) { + db_->Get(ReadOptions(), cfh2, Key(i), &value2); + ASSERT_EQ(Key(i) + "_overwrite3", value2); + } + for (int i = 50; i < 100; ++i) { + db_->Get(ReadOptions(), cfh2, Key(i), &value2); + ASSERT_EQ(Key(i) + "_overwrite2", value2); + } + + // Compact and check again. + ASSERT_OK(db_->Flush(FlushOptions(), cfh2)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), cfh2, nullptr, nullptr)); + + for (int i = 0; i < 25; ++i) { + ASSERT_TRUE(db_->Get(ReadOptions(), cfh2, Key(i), &value2).IsNotFound()); + } + for (int i = 25; i < 50; ++i) { + db_->Get(ReadOptions(), cfh2, Key(i), &value2); + ASSERT_EQ(Key(i) + "_overwrite3", value2); + } + for (int i = 50; i < 100; ++i) { + db_->Get(ReadOptions(), cfh2, Key(i), &value2); + ASSERT_EQ(Key(i) + "_overwrite2", value2); + } + ASSERT_OK(db_->DropColumnFamily(cfh2)); + ASSERT_OK(db_->DropColumnFamily(cfh3)); +} + +TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherDB) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_val"); + } + ASSERT_OK(Flush(1)); + + // Compact to create a L1 file. + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr)); + + // Overwrite the value in the same set of keys. + for (int i = 0; i < 50; ++i) { + Put(1, Key(i), Key(i) + "_overwrite"); + } + + // Flush to create L0 file. + ASSERT_OK(Flush(1)); + + for (int i = 0; i < 25; ++i) { + Put(1, Key(i), Key(i) + "_overwrite2"); + } + + // Flush again to create another L0 file. It should have higher sequencer. + ASSERT_OK(Flush(1)); + + std::vector metadata; + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], &metadata, + export_files_dir_)); + + // Create a new db and import the files. + DB* db_copy; + test::DestroyDir(env_, dbname_ + "/db_copy"); + ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy)); + ColumnFamilyHandle* cfh; + ASSERT_OK(db_copy->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &cfh)); + + for (int i = 0; i < 100; ++i) { + std::string value; + db_copy->Get(ReadOptions(), cfh, Key(i), &value); + ASSERT_EQ(Get(1, Key(i)), value); + } + db_copy->DropColumnFamily(cfh); + test::DestroyDir(env_, dbname_ + "/db_copy"); +} + +TEST_F(ImportColumnFamilyTest, ImportColumnFamilyNegativeTest) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + { + // Create column family with existing cf name. + std::vector metadata; + ColumnFamilyHandle* cfh; + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "koko", + ImportColumnFamilyOptions(), + metadata, &cfh), + Status::InvalidArgument("Column family already exists")); + } + + { + // Import with no files specified. + std::vector metadata; + ColumnFamilyHandle* cfh; + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &cfh), + Status::InvalidArgument("The list of files is empty")); + } + + { + // Import with overlapping keys in sst files. + std::vector metadata; + ColumnFamilyHandle* cfh; + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + const std::string file1_sst_name = "file1.sst"; + const std::string file1_sst = sst_files_dir_ + file1_sst_name; + ASSERT_OK(sfw_cf1.Open(file1_sst)); + ASSERT_OK(sfw_cf1.Put("K1", "V1")); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Finish()); + const std::string file2_sst_name = "file2.sst"; + const std::string file2_sst = sst_files_dir_ + file2_sst_name; + ASSERT_OK(sfw_cf1.Open(file2_sst)); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Put("K3", "V3")); + ASSERT_OK(sfw_cf1.Finish()); + + metadata.push_back( + LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.push_back( + LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 1, 10, 19)); + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &cfh), + Status::InvalidArgument("Files have overlapping ranges")); + } + { + // Import with non existent sst file should fail with appropriate error. + std::vector metadata; + ColumnFamilyHandle* cfh; + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + const std::string file1_sst_name = "file1.sst"; + const std::string file1_sst = sst_files_dir_ + file1_sst_name; + ASSERT_OK(sfw_cf1.Open(file1_sst)); + ASSERT_OK(sfw_cf1.Put("K1", "V1")); + ASSERT_OK(sfw_cf1.Put("K2", "V2")); + ASSERT_OK(sfw_cf1.Finish()); + const std::string file3_sst_name = "file3.sst"; + + metadata.push_back( + LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19)); + metadata.push_back( + LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 1, 10, 19)); + + ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo", + ImportColumnFamilyOptions(), + metadata, &cfh), + Status::IOError("No such file or directory")); + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int argc, char** argv) { + fprintf(stderr, + "SKIPPED as External SST File Writer and Import are not supported " + "in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 6a37084c52e..6b13f4c28eb 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1046,6 +1046,19 @@ class DB { return IngestExternalFile(DefaultColumnFamily(), external_files, options); } + // CreateColumnFamilyWithImport() will create a new column family with + // column_family_name and import external SST files specified in metadata into + // this column family. + // (1) External SST files can be created using SstFileWriter. + // (2) External SST files can be exported from a particular column family in + // an existing DB. + // External files are deleted on a successful return. + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const std::vector& metadata, + ColumnFamilyHandle** handle) = 0; + virtual Status VerifyChecksum() = 0; // AddFile() is deprecated, please use IngestExternalFile() diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index c3ba448394b..eb1feb58612 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1344,4 +1344,9 @@ struct TraceOptions { uint64_t max_trace_file_size = uint64_t{64} * 1024 * 1024 * 1024; }; +// ImportColumnFamilyOptions is used by ImportColumnFamily() +struct ImportColumnFamilyOptions { + // Can be set to true to move the files instead of copying them. + bool move_files = false; +}; } // namespace rocksdb diff --git a/include/rocksdb/utilities/checkpoint.h b/include/rocksdb/utilities/checkpoint.h index aa0a394d4d0..ae3014dc2b6 100644 --- a/include/rocksdb/utilities/checkpoint.h +++ b/include/rocksdb/utilities/checkpoint.h @@ -9,11 +9,14 @@ #ifndef ROCKSDB_LITE #include +#include #include "rocksdb/status.h" namespace rocksdb { class DB; +class ColumnFamilyHandle; +struct LiveFileMetaData; class Checkpoint { public: @@ -36,6 +39,17 @@ class Checkpoint { virtual Status CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush = 0); + // Exports all live SST files of a specified Column Family onto export_dir, + // returning SST files information in metadata. + // - SST files will be created as hard links when the directory specified + // is in the same partition as the db directory, copied otherwise. + // - export_dir should not already exist and will be created by this API. + // - export_dir should be specified with its absolute path. + // - Always triggers a flush. + virtual Status ExportColumnFamily(ColumnFamilyHandle* handle, + std::vector* metadata, + const std::string& export_dir); + virtual ~Checkpoint() {} }; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index eae3a85ea1f..7a6270941a5 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -107,6 +107,16 @@ class StackableDB : public DB { return db_->IngestExternalFile(column_family, external_files, options); } + using DB::CreateColumnFamilyWithImport; + virtual Status CreateColumnFamilyWithImport( + const ColumnFamilyOptions& options, const std::string& column_family_name, + const ImportColumnFamilyOptions& import_options, + const std::vector& metadata, + ColumnFamilyHandle** handle) override { + return db_->CreateColumnFamilyWithImport(options, column_family_name, + import_options, metadata, handle); + } + virtual Status VerifyChecksum() override { return db_->VerifyChecksum(); } using DB::KeyMayExist; diff --git a/src.mk b/src.mk index 990aa2ab7a6..9a5778c6c55 100644 --- a/src.mk +++ b/src.mk @@ -34,6 +34,7 @@ LIB_SOURCES = \ db/flush_job.cc \ db/flush_scheduler.cc \ db/forward_iterator.cc \ + db/import_column_family_job.cc \ db/internal_stats.cc \ db/logs_with_prep_tracker.cc \ db/log_reader.cc \ diff --git a/utilities/checkpoint/checkpoint_impl.cc b/utilities/checkpoint/checkpoint_impl.cc index 9863ac1d564..79f3a5650f5 100644 --- a/utilities/checkpoint/checkpoint_impl.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -24,6 +24,7 @@ #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/metadata.h" #include "rocksdb/transaction_log.h" #include "rocksdb/utilities/checkpoint.h" #include "util/file_util.h" @@ -64,6 +65,12 @@ void CheckpointImpl::CleanStagingDirectory( full_private_path.c_str(), s.ToString().c_str()); } +Status Checkpoint::ExportColumnFamily( + ColumnFamilyHandle* /*handle*/, std::vector* /*metadata*/, + const std::string& /*export_dir*/) { + return Status::NotSupported(""); +} + // Builds an openable snapshot of RocksDB Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush) { @@ -326,6 +333,179 @@ Status CheckpointImpl::CreateCustomCheckpoint( return s; } +// Exports all live SST files of a specified Column Family onto export_dir, +// returning SST files information in metadata. +Status CheckpointImpl::ExportColumnFamily( + ColumnFamilyHandle* handle, std::vector* metadata, + const std::string& export_dir) { + auto cfh = reinterpret_cast(handle); + const auto cf_name = cfh->GetName(); + const auto db_options = db_->GetDBOptions(); + + auto s = db_->GetEnv()->FileExists(export_dir); + if (s.ok()) { + return Status::InvalidArgument("Specified export_dir exists"); + } else if (!s.IsNotFound()) { + assert(s.IsIOError()); + return s; + } + + const auto final_nonslash_idx = export_dir.find_last_not_of('/'); + if (export_dir[0] != '/' || final_nonslash_idx == std::string::npos) { + return Status::InvalidArgument("Specified export_dir invalid"); + } + ROCKS_LOG_INFO(db_options.info_log, + "[%s] export column family onto export directory %s", + cf_name.c_str(), export_dir.c_str()); + + // Create a temporary export directory. + const auto tmp_export_dir = + export_dir.substr(0, final_nonslash_idx + 1) + ".tmp"; + s = db_->GetEnv()->CreateDir(tmp_export_dir); + + if (s.ok()) { + s = db_->Flush(rocksdb::FlushOptions(), handle); + } + + ColumnFamilyMetaData db_metadata; + if (s.ok()) { + // Export live sst files with file deletions disabled. + s = db_->DisableFileDeletions(); + if (s.ok()) { + db_->GetColumnFamilyMetaData(handle, &db_metadata); + + s = ExportFilesInMetaData( + db_options, db_metadata, + [&](const std::string& src_dirname, const std::string& fname) { + ROCKS_LOG_INFO(db_options.info_log, "[%s] HardLinking %s", + cf_name.c_str(), fname.c_str()); + return db_->GetEnv()->LinkFile(src_dirname + fname, + tmp_export_dir + fname); + } /*link_file_cb*/, + [&](const std::string& src_dirname, const std::string& fname) { + ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s", + cf_name.c_str(), fname.c_str()); + return CopyFile(db_->GetEnv(), src_dirname + fname, + tmp_export_dir + fname, 0, db_options.use_fsync); + } /*copy_file_cb*/); + + const auto enable_status = db_->EnableFileDeletions(false /*force*/); + if (s.ok()) { + s = enable_status; + } + } + } + + auto moved_to_user_specified_dir = false; + if (s.ok()) { + // Move temporary export directory to the actual export directory. + s = db_->GetEnv()->RenameFile(tmp_export_dir, export_dir); + } + + if (s.ok()) { + // Fsync export directory. + moved_to_user_specified_dir = true; + unique_ptr dir_ptr; + s = db_->GetEnv()->NewDirectory(export_dir, &dir_ptr); + if (s.ok()) { + assert(dir_ptr != nullptr); + s = dir_ptr->Fsync(); + } + } + + if (s.ok()) { + // Export of files succeeded. Fill in the metadata information. + for (const auto& level_metadata : db_metadata.levels) { + for (const auto& file_metadata : level_metadata.files) { + LiveFileMetaData live_file_metadata; + live_file_metadata.size = file_metadata.size; + live_file_metadata.name = std::move(file_metadata.name); + live_file_metadata.db_path = export_dir; + live_file_metadata.smallest_seqno = file_metadata.smallest_seqno; + live_file_metadata.largest_seqno = file_metadata.largest_seqno; + live_file_metadata.smallestkey = std::move(file_metadata.smallestkey); + live_file_metadata.largestkey = std::move(file_metadata.largestkey); + live_file_metadata.level = level_metadata.level; + metadata->emplace_back(live_file_metadata); + } + } + ROCKS_LOG_INFO(db_options.info_log, "[%s] Export succeeded.", + cf_name.c_str()); + } else { + // Failure: Clean up all the files/directories created. + ROCKS_LOG_INFO(db_options.info_log, "[%s] Export failed. %s", + cf_name.c_str(), s.ToString().c_str()); + std::vector subchildren; + const auto cleanup_dir = + moved_to_user_specified_dir ? export_dir : tmp_export_dir; + db_->GetEnv()->GetChildren(cleanup_dir, &subchildren); + for (const auto& subchild : subchildren) { + const auto subchild_path = cleanup_dir + "/" + subchild; + const auto status = db_->GetEnv()->DeleteFile(subchild_path); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup file %s: %s", + subchild_path.c_str(), status.ToString().c_str()); + } + } + const auto status = db_->GetEnv()->DeleteDir(cleanup_dir); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup dir %s: %s", + cleanup_dir.c_str(), status.ToString().c_str()); + } + } + return s; +} + +Status CheckpointImpl::ExportFilesInMetaData( + const DBOptions& db_options, const ColumnFamilyMetaData& metadata, + std::function + link_file_cb, + std::function + copy_file_cb) { + Status s; + auto hardlink_file = true; + + // Copy/hard link files in metadata. + size_t num_files = 0; + for (const auto& level_metadata : metadata.levels) { + for (const auto& file_metadata : level_metadata.files) { + uint64_t number; + FileType type; + const auto ok = ParseFileName(file_metadata.name, &number, &type); + if (!ok) { + s = Status::Corruption("Could not parse file name"); + break; + } + + // We should only get sst files here. + assert(type == kTableFile); + assert(file_metadata.size > 0 && file_metadata.name[0] == '/'); + const auto src_fname = file_metadata.name; + ++num_files; + + if (hardlink_file) { + s = link_file_cb(db_->GetName(), src_fname); + if (num_files == 1 && s.IsNotSupported()) { + // Fallback to copy if link failed due to cross-device directories. + hardlink_file = false; + s = Status::OK(); + } + } + if (!hardlink_file) { + s = copy_file_cb(db_->GetName(), src_fname); + } + if (!s.ok()) { + break; + } + } + } + ROCKS_LOG_INFO(db_options.info_log, "Number of table files %" ROCKSDB_PRIszt, + num_files); + + return s; +} } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/checkpoint/checkpoint_impl.h b/utilities/checkpoint/checkpoint_impl.h index a85fde59b60..715cc701d02 100644 --- a/utilities/checkpoint/checkpoint_impl.h +++ b/utilities/checkpoint/checkpoint_impl.h @@ -30,6 +30,18 @@ class CheckpointImpl : public Checkpoint { virtual Status CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush) override; + // Exports all live SST files of a specified Column Family onto export_dir + // and returning SST files information in metadata. + // - SST files will be created as hard links when the directory specified + // is in the same partition as the db directory, copied otherwise. + // - export_dir should not already exist and will be created by this API. + // - export_dir should be specified with its absolute path. + // - Always triggers a flush. + using Checkpoint::ExportColumnFamily; + virtual Status ExportColumnFamily( + ColumnFamilyHandle* handle, std::vector* metadata, + const std::string& export_dir) override; + // Checkpoint logic can be customized by providing callbacks for link, copy, // or create. Status CreateCustomCheckpoint( @@ -48,6 +60,18 @@ class CheckpointImpl : public Checkpoint { private: void CleanStagingDirectory(const std::string& path, Logger* info_log); + + // Export logic customization by providing callbacks for link or copy. + Status ExportFilesInMetaData( + const DBOptions& db_options, const ColumnFamilyMetaData& metadata, + std::function + link_file_cb, + std::function + copy_file_cb); + + private: DB* db_; }; diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index b8436ccf590..d938756c579 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -26,6 +26,7 @@ #include "util/fault_injection_test_env.h" #include "util/sync_point.h" #include "util/testharness.h" +#include "util/testutil.h" namespace rocksdb { class CheckpointTest : public testing::Test { @@ -140,6 +141,12 @@ class CheckpointTest : public testing::Test { ASSERT_OK(TryReopen(options)); } + void CompactAll() { + for (auto h : handles_) { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), h, nullptr, nullptr)); + } + } + void Close() { for (auto h : handles_) { delete h; @@ -289,6 +296,95 @@ TEST_F(CheckpointTest, GetSnapshotLink) { } } +TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) { + const auto export_path = test::TmpDir(env_) + "/export"; + auto options = CurrentOptions(); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + ASSERT_OK(DestroyDB(export_path, options)); + test::DestroyDir(env_, export_path); + + // Create a database + Status s; + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export the Tables + std::vector metadata; + // Verify there is one sst file in metadata and export directory. + auto verify_files_exported = [&](int num_files_expected) { + ASSERT_EQ(metadata.size(), num_files_expected); + std::vector subchildren; + env_->GetChildren(export_path, &subchildren); + int num_children = 0; + for (const auto& child : subchildren) { + if (child != "." && child != "..") { + ++num_children; + } + } + ASSERT_EQ(num_children, num_files_expected); + }; + + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + &metadata, export_path)); + verify_files_exported(1); + test::DestroyDir(env_, export_path); + + // Check after compaction + CompactAll(); + ASSERT_OK(Put(key, "v2")); + metadata.clear(); + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + &metadata, export_path)); + verify_files_exported(2); + test::DestroyDir(env_, export_path); +} + +TEST_F(CheckpointTest, ExportColumnFamilyNegativeTest) { + auto export_path = test::TmpDir(env_) + "/export"; + auto options = CurrentOptions(); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + ASSERT_OK(DestroyDB(export_path, options)); + test::DestroyDir(env_, export_path); + + // Create a database + Status s; + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export onto existing directory. + env_->CreateDirIfMissing(export_path); + std::vector metadata; + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + &metadata, export_path), + Status::InvalidArgument("Specified export_dir exists")); + test::DestroyDir(env_, export_path); + + // Export with invalid directory specification. + export_path = + "../export"; + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + &metadata, export_path), + Status::InvalidArgument("Specified export_dir invalid")); + + export_path = + "///"; + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + &metadata, export_path), + Status::InvalidArgument("Specified export_dir invalid")); +} + TEST_F(CheckpointTest, CheckpointCF) { Options options = CurrentOptions(); CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options);