Skip to content

Commit

Permalink
modify sst file seq no (#173)
Browse files Browse the repository at this point in the history
* modify external sst file's global seq no
  • Loading branch information
UncP committed Dec 11, 2017
1 parent e6492cb commit c141298
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 5 deletions.
4 changes: 3 additions & 1 deletion librocksdb_sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.


extern crate cc;
extern crate cmake;

Expand Down Expand Up @@ -85,12 +84,15 @@ fn build_rocksdb() -> Build {
_ => "Debug",
};
println!("cargo:rustc-link-search=native={}/{}", build_dir, profile);
build.define("OS_WIN", None);
} else {
println!("cargo:rustc-link-search=native={}", build_dir);
build.define("ROCKSDB_PLATFORM_POSIX", None);
}

let cur_dir = env::current_dir().unwrap();
build.include(cur_dir.join("rocksdb").join("include"));
build.include(cur_dir.join("rocksdb"));

println!("cargo:rustc-link-lib=static=rocksdb");
println!("cargo:rustc-link-lib=static=z");
Expand Down
130 changes: 129 additions & 1 deletion librocksdb_sys/crocksdb/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
#include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/debug.h"
#include "rocksdb/write_batch.h"

#include "db/column_family.h"
#include "table/sst_file_writer_collectors.h"
#include "table/table_reader.h"
#include "table/block_based_table_factory.h"
#include "util/file_reader_writer.h"
#include "util/coding.h"

#include <stdlib.h>

#if !defined(ROCKSDB_MAJOR) || !defined(ROCKSDB_MINOR) || !defined(ROCKSDB_PATCH)
Expand Down Expand Up @@ -109,6 +117,19 @@ using rocksdb::TablePropertiesCollectorFactory;
using rocksdb::KeyVersion;
using rocksdb::DbPath;

using rocksdb::ColumnFamilyData;
using rocksdb::ColumnFamilyHandleImpl;
using rocksdb::TableReaderOptions;
using rocksdb::TableReader;
using rocksdb::BlockBasedTableFactory;
using rocksdb::RandomAccessFile;
using rocksdb::RandomAccessFileReader;
using rocksdb::RandomRWFile;
using rocksdb::ExternalSstFilePropertyNames;
using rocksdb::DecodeFixed32;
using rocksdb::DecodeFixed64;
using rocksdb::PutFixed64;

using std::shared_ptr;

extern "C" {
Expand Down Expand Up @@ -2444,7 +2465,7 @@ crocksdb_ratelimiter_t* crocksdb_ratelimiter_create(

void crocksdb_ratelimiter_destroy(crocksdb_ratelimiter_t *limiter) {
if (limiter->rep) {
delete limiter->rep;
delete limiter->rep;
}
delete limiter;
}
Expand Down Expand Up @@ -3787,4 +3808,111 @@ int crocksdb_keyversions_type(const crocksdb_keyversions_t *kvs, int index) {
return kvs->rep[index].type;
}

struct ExternalSstFileModifier {
ExternalSstFileModifier(Env *env, ColumnFamilyData *cfd, DBOptions &db_options)
:env_(env), cfd_(cfd), env_options_(db_options), table_reader_(nullptr) { }

Status Open(std::string file) {
file_ = file;
// Get External Sst File Size
uint64_t file_size;
auto status = env_->GetFileSize(file_, &file_size);
if (!status.ok()) {
return status;
}

// Open External Sst File
std::unique_ptr<RandomAccessFile> sst_file;
std::unique_ptr<RandomAccessFileReader> sst_file_reader;
status = env_->NewRandomAccessFile(file_, &sst_file, env_options_);
if (!status.ok()) {
return status;
}
sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file), file_));

// Get Table Reader
status = cfd_->ioptions()->table_factory->NewTableReader(
TableReaderOptions(*cfd_->ioptions(), env_options_,
cfd_->internal_comparator()),
std::move(sst_file_reader), file_size, &table_reader_);
return status;
}

Status SetGlobalSeqNo(uint64_t seq_no, uint64_t *pre_seq_no) {
if (table_reader_ == nullptr) {
return Status::InvalidArgument("File is not open or seq-no has been modified");
}
// Get the external file properties
auto props = table_reader_->GetTableProperties();
const auto& uprops = props->user_collected_properties;
// Validate version and seqno offset
auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion);
if (version_iter == uprops.end()) {
return Status::Corruption("External file version not found");
}
uint32_t version = DecodeFixed32(version_iter->second.c_str());
if (version != 2) {
return Status::NotSupported("External file version should be 2");
}

auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno);
if (seqno_iter == uprops.end()) {
return Status::Corruption("External file global sequence number not found");
}
*pre_seq_no = DecodeFixed64(seqno_iter->second.c_str());
uint64_t offset = props->properties_offsets.at(ExternalSstFilePropertyNames::kGlobalSeqno);
if (offset == 0) {
return Status::Corruption("Was not able to find file global seqno field");
}

if (*pre_seq_no == seq_no) {
// This file already have the correct global seqno
return Status::OK();
}

std::unique_ptr<RandomRWFile> rwfile;
auto status = env_->NewRandomRWFile(file_, &rwfile, env_options_);
if (!status.ok()) {
return status;
}

// Write the new seqno in the global sequence number field in the file
std::string seqno_val;
PutFixed64(&seqno_val, seq_no);
status = rwfile->Write(offset, seqno_val);
return status;
}

private:
Env *env_;
ColumnFamilyData *cfd_;
EnvOptions env_options_;
std::string file_;
std::unique_ptr<TableReader> table_reader_;
};

// !!! this function is dangerous because it uses rocksdb's non-public API !!!
// find the offset of external sst file's `global seq no` and modify it.
uint64_t crocksdb_set_external_sst_file_global_seq_no(
crocksdb_t *db,
crocksdb_column_family_handle_t *column_family,
const char *file,
uint64_t seq_no,
char **errptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family->rep);
auto db_options = db->rep->GetDBOptions();
ExternalSstFileModifier modifier(db->rep->GetEnv(), cfh->cfd(), db_options);
auto s = modifier.Open(std::string(file));
uint64_t pre_seq_no = 0;
if (!s.ok()) {
SaveError(errptr, s);
return pre_seq_no;
}
s = modifier.SetGlobalSeqNo(seq_no, &pre_seq_no);
if (!s.ok()) {
SaveError(errptr, s);
}
return pre_seq_no;
}

} // end extern "C"
9 changes: 9 additions & 0 deletions librocksdb_sys/crocksdb/crocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -1561,6 +1561,15 @@ crocksdb_keyversions_seq(const crocksdb_keyversions_t *kvs, int index);
extern C_ROCKSDB_LIBRARY_API int
crocksdb_keyversions_type(const crocksdb_keyversions_t *kvs, int index);

/* Modify Sst File Seq No */
extern C_ROCKSDB_LIBRARY_API uint64_t
crocksdb_set_external_sst_file_global_seq_no(
crocksdb_t *db,
crocksdb_column_family_handle_t *column_family,
const char *file,
uint64_t seq_no,
char **errptr);

#ifdef __cplusplus
} /* end extern "C" */
#endif
Expand Down
8 changes: 8 additions & 0 deletions librocksdb_sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,14 @@ extern "C" {
pub fn crocksdb_keyversions_seq(kvs: *mut DBKeyVersions, index: usize) -> uint64_t;

pub fn crocksdb_keyversions_type(kvs: *mut DBKeyVersions, index: usize) -> c_int;

pub fn crocksdb_set_external_sst_file_global_seq_no(
db: *mut DBInstance,
handle: *mut DBCFHandle,
file: *const c_char,
seq_no: u64,
err: *mut *mut c_char,
) -> u64;
}

#[cfg(test)]
Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ pub use librocksdb_sys::{self as crocksdb_ffi, new_bloom_filter, CompactionPrior
DBCompactionStyle, DBCompressionType, DBEntryType, DBInfoLogLevel,
DBRecoveryMode, DBStatisticsHistogramType, DBStatisticsTickerType};
pub use merge_operator::MergeOperands;
pub use rocksdb::{BackupEngine, CFHandle, DBIterator, DBVector, Env, ExternalSstFileInfo, Kv,
Range, SeekKey, SequentialFile, SstFileWriter, Writable, WriteBatch, DB};
pub use rocksdb::{set_external_sst_file_global_seq_no, BackupEngine, CFHandle, DBIterator,
DBVector, Env, ExternalSstFileInfo, Kv, Range, SeekKey, SequentialFile,
SstFileWriter, Writable, WriteBatch, DB};
pub use rocksdb_options::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions, DBOptions,
EnvOptions, FifoCompactionOptions, HistogramData,
IngestExternalFileOptions, RateLimiter, ReadOptions, RestoreOptions,
Expand Down
19 changes: 18 additions & 1 deletion src/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use crocksdb_ffi::{self, DBBackupEngine, DBCFHandle, DBCompressionType, DBEnv, DBInstance,
DBPinnableSlice, DBSequentialFile, DBStatisticsHistogramType,
Expand Down Expand Up @@ -1964,6 +1963,24 @@ impl Drop for SequentialFile {
}
}

pub fn set_external_sst_file_global_seq_no(
db: &DB,
cf: &CFHandle,
file: &str,
seq_no: u64,
) -> Result<u64, String> {
let cfile = CString::new(file).unwrap();
unsafe {
let pre_seq_no = ffi_try!(crocksdb_set_external_sst_file_global_seq_no(
db.inner,
cf.inner,
cfile.as_ptr(),
seq_no
));
Ok(pre_seq_no)
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
31 changes: 31 additions & 0 deletions tests/test_ingest_external_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,34 @@ fn test_mem_sst_file_writer() {
assert!(env.delete_file(mem_sst_str).is_ok());
assert!(env.file_exists(mem_sst_str).is_err());
}

#[test]
fn test_set_external_sst_file_global_seq_no() {
let db_path = TempDir::new("_rust_rocksdb_set_external_sst_file_global_seq_no_db").expect("");
let db = create_default_database(&db_path);
let path = TempDir::new("_rust_rocksdb_set_external_sst_file_global_seq_no").expect("");
let file = path.path().join("sst_file");
let sstfile_str = file.to_str().unwrap();
gen_sst(
ColumnFamilyOptions::new(),
Some(db.cf_handle("default").unwrap()),
sstfile_str,
&[(b"k1", b"v1"), (b"k2", b"v2")],
);

let handle = db.cf_handle("default").unwrap();
let seq_no = 1;
// varify change seq_no
let r1 = set_external_sst_file_global_seq_no(&db, &handle, sstfile_str, seq_no);
assert!(r1.unwrap() != seq_no);
// varify that seq_no are equal
let r2 = set_external_sst_file_global_seq_no(&db, &handle, sstfile_str, seq_no);
assert!(r2.unwrap() == seq_no);

// change seq_no back to 0 so that it can be ingested
assert!(set_external_sst_file_global_seq_no(&db, &handle, sstfile_str, 0).is_ok());

db.ingest_external_file(&IngestExternalFileOptions::new(), &[sstfile_str])
.unwrap();
check_kv(&db, None, &[(b"k1", Some(b"v1")), (b"k2", Some(b"v2"))]);
}

0 comments on commit c141298

Please sign in to comment.