Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve backup checksum feature #5683

Merged
merged 15 commits into from
Oct 23, 2019
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/backup/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl BackupRange {
}
debug!("backup scan entries"; "len" => batch.len());
// Build sst files.
if let Err(e) = writer.write(batch.drain()) {
if let Err(e) = writer.write(batch.drain(), true) {
error!("backup build sst failed"; "error" => ?e);
return Err(e);
}
Expand Down
159 changes: 111 additions & 48 deletions components/backup/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,90 @@ use engine::rocks::{SstWriter, SstWriterBuilder};
use engine::{CF_DEFAULT, CF_WRITE, DB};
use external_storage::ExternalStorage;
use kvproto::backup::File;
use tikv::coprocessor::checksum_crc64_xor;
use tikv::raftstore::store::keys;
use tikv::storage::txn::TxnEntry;
use tikv_util;
use tikv_util::{self, box_err};

use crate::metrics::*;
use crate::{Error, Result};

struct Writer {
writer: SstWriter,
total_kvs: u64,
total_bytes: u64,
checksum: u64,
}

impl Writer {
fn new(writer: SstWriter) -> Self {
Writer {
writer,
total_kvs: 0,
total_bytes: 0,
checksum: 0,
}
}

fn write(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
// HACK: The actual key stored in TiKV is called
// data_key and always prefix a `z`. But iterator strips
// it, we need to add the prefix manually.
let data_key_write = keys::data_key(key);
self.writer.put(&data_key_write, value)?;
Ok(())
}

fn update_with(&mut self, entry: TxnEntry, need_checksum: bool) -> Result<()> {
self.total_kvs += 1;
if need_checksum {
let (k, v) = entry
.into_kvpair()
.map_err(|err| Error::Other(box_err!("Decode error: {:?}", err)))?;
self.total_bytes += (k.len() + v.len()) as u64;
self.checksum = checksum_crc64_xor(self.checksum, &[], &k, &v);
}
Ok(())
}

fn save_and_build_file(
mut self,
name: &str,
cf: &'static str,
buf: &mut Vec<u8>,
limiter: Option<Arc<IOLimiter>>,
storage: &dyn ExternalStorage,
) -> Result<File> {
buf.reserve(self.writer.file_size() as _);
self.writer.finish_into(buf)?;
BACKUP_RANGE_SIZE_HISTOGRAM_VEC
.with_label_values(&[cf])
.observe(buf.len() as _);
let file_name = format!("{}_{}.sst", name, cf);
let sha256 = tikv_util::file::sha256(&buf)
.map_err(|e| Error::Other(box_err!("Sha256 error: {:?}", e)))?;
let mut contents = buf as &[u8];
let mut limit_reader = LimitReader::new(limiter, &mut contents);
storage.write(&file_name, &mut limit_reader)?;
let mut file = File::new();
file.set_name(file_name);
file.set_sha256(sha256);
file.set_crc64xor(self.checksum);
file.set_total_kvs(self.total_kvs);
file.set_total_bytes(self.total_bytes);
Ok(file)
}

fn is_empty(&self) -> bool {
self.total_kvs == 0
}
}

/// A writer writes txn entries into SST files.
pub struct BackupWriter {
name: String,
default: SstWriter,
default_written: bool,
write: SstWriter,
write_written: bool,

default: Writer,
write: Writer,
limiter: Option<Arc<IOLimiter>>,
}

Expand All @@ -42,77 +111,69 @@ impl BackupWriter {
let name = name.to_owned();
Ok(BackupWriter {
name,
default,
default_written: false,
write,
write_written: false,
default: Writer::new(default),
write: Writer::new(write),
limiter,
})
}

/// Wrtie entries to buffered SST files.
pub fn write<I>(&mut self, entries: I) -> Result<()>
/// Write entries to buffered SST files.
pub fn write<I>(&mut self, entries: I, need_checksum: bool) -> Result<()>
where
I: Iterator<Item = TxnEntry>,
{
for e in entries {
match e {
let mut value_in_default = false;
match &e {
TxnEntry::Commit { default, write } => {
// Default may be empty if value is small.
if !default.0.is_empty() {
// HACK: The actual key stored in TiKV is called
// data_key and always prefix a `z`. But iterator strips
// it, we need to add the prefix manually.
let data_key_default = keys::data_key(&default.0);
self.default.put(&data_key_default, &default.1)?;
self.default_written = true;
self.default.write(&default.0, &default.1)?;
value_in_default = true;
}
assert!(!write.0.is_empty());
let data_key_write = keys::data_key(&write.0);
self.write.put(&data_key_write, &write.1)?;
self.write_written = true;
self.write.write(&write.0, &write.1)?;
}
TxnEntry::Prewrite { .. } => {
return Err(Error::Other("prewrite is not supported".into()))
return Err(Error::Other("prewrite is not supported".into()));
}
}
if value_in_default {
self.default.update_with(e, need_checksum)?;
} else {
self.write.update_with(e, need_checksum)?;
}
}
Ok(())
}

/// Save buffered SST files to the given external storage.
pub fn save(mut self, storage: &dyn ExternalStorage) -> Result<Vec<File>> {
let name = self.name;
let save_and_build_file =
|cf, mut contents: &[u8], limiter: Option<Arc<IOLimiter>>| -> Result<File> {
BACKUP_RANGE_SIZE_HISTOGRAM_VEC
.with_label_values(&[cf])
.observe(contents.len() as _);
let name = format!("{}_{}.sst", name, cf);
let checksum = tikv_util::file::calc_crc32_bytes(contents);
let mut limit_reader = LimitReader::new(limiter, &mut contents);
storage.write(&name, &mut limit_reader)?;
let mut file = File::new();
file.set_crc32(checksum);
file.set_name(name);
Ok(file)
};
pub fn save(self, storage: &dyn ExternalStorage) -> Result<Vec<File>> {
let start = Instant::now();
let mut files = Vec::with_capacity(2);
let mut buf = Vec::new();
if self.default_written {
let write_written = !self.write.is_empty() || !self.default.is_empty();
if !self.default.is_empty() {
// Save default cf contents.
buf.reserve(self.default.file_size() as _);
self.default.finish_into(&mut buf)?;
let default = save_and_build_file(CF_DEFAULT, &mut buf, self.limiter.clone())?;
let default = self.default.save_and_build_file(
&self.name,
CF_DEFAULT,
&mut buf,
self.limiter.clone(),
storage,
)?;
files.push(default);
buf.clear();
}
if self.write_written {
if write_written {
// Save write cf contents.
buf.reserve(self.write.file_size() as _);
self.write.finish_into(&mut buf)?;
let write = save_and_build_file(CF_WRITE, &mut buf, self.limiter)?;
let write = self.write.save_and_build_file(
&self.name,
CF_WRITE,
&mut buf,
self.limiter.clone(),
storage,
)?;
files.push(write);
}
BACKUP_RANGE_HISTOGRAM_VEC
Expand Down Expand Up @@ -183,7 +244,7 @@ mod tests {

// Test empty file.
let mut writer = BackupWriter::new(db.clone(), "foo", None).unwrap();
writer.write(vec![].into_iter()).unwrap();
writer.write(vec![].into_iter(), false).unwrap();
assert!(writer.save(&storage).unwrap().is_empty());

// Test write only txn.
Expand All @@ -195,6 +256,7 @@ mod tests {
write: (vec![b'a'], vec![b'a']),
}]
.into_iter(),
false,
)
.unwrap();
let files = writer.save(&storage).unwrap();
Expand All @@ -213,6 +275,7 @@ mod tests {
write: (vec![b'a'], vec![b'a']),
}]
.into_iter(),
false,
)
.unwrap();
let files = writer.save(&storage).unwrap();
Expand Down
90 changes: 89 additions & 1 deletion components/backup/tests/integrations/test_backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ use kvproto::raft_cmdpb::{CmdType, RaftCmdRequest, RaftRequestHeader, Request};
use kvproto::tikvpb_grpc::TikvClient;
use tempfile::Builder;
use test_raftstore::*;
use tidb_query::storage::scanner::{RangesScanner, RangesScannerOptions};
use tidb_query::storage::{IntervalRange, Range};
use tikv::coprocessor::checksum_crc64_xor;
use tikv::coprocessor::dag::TiKVStorage;
use tikv::storage::kv::Engine;
use tikv::storage::SnapshotStore;
use tikv_util::collections::HashMap;
use tikv_util::file::calc_crc32_bytes;
use tikv_util::worker::Worker;
use tikv_util::HandyRwLock;

Expand Down Expand Up @@ -179,6 +186,29 @@ impl TestSuite {
}
rx
}

fn admin_checksum(&self, backup_ts: u64, start: String, end: String) -> (u64, u64, u64) {
let mut checksum = 0;
let mut total_kvs = 0;
let mut total_bytes = 0;
let sim = self.cluster.sim.rl();
let engine = sim.storages[&self.context.get_peer().get_store_id()].clone();
let snapshot = engine.snapshot(&self.context.clone()).unwrap();
let snap_store = SnapshotStore::new(snapshot, backup_ts, IsolationLevel::Si, false);
let mut scanner = RangesScanner::new(RangesScannerOptions {
storage: TiKVStorage::from(snap_store),
ranges: vec![Range::Interval(IntervalRange::from((start, end)))],
scan_backward_in_range: false,
is_key_only: false,
is_scanned_range_aware: false,
});
while let Some((k, v)) = scanner.next().unwrap() {
NingLin-P marked this conversation as resolved.
Show resolved Hide resolved
checksum = checksum_crc64_xor(checksum, &[], &k, &v);
total_kvs += 1;
total_bytes += (k.len() + v.len()) as u64;
}
(checksum, total_kvs, total_bytes)
}
}

// Extrat CF name from sst name.
Expand Down Expand Up @@ -264,7 +294,7 @@ fn test_backup_and_import() {
let mut content = vec![];
reader.read_to_end(&mut content).unwrap();
let mut m = sst_meta.clone();
m.crc32 = f.crc32;
m.crc32 = calc_crc32_bytes(&content);
m.length = content.len() as _;
m.cf_name = name_to_cf(&f.name).to_owned();
metas.push((m, content));
Expand Down Expand Up @@ -312,3 +342,61 @@ fn test_backup_and_import() {

suite.stop();
}

#[test]
fn test_backup_meta() {
let mut suite = TestSuite::new(3);
let key_count = 60;

// 3 version for each key.
for _ in 0..3 {
for i in 0..key_count {
let (k, v) = (format!("key_{}", i), format!("value_{}", i));
// Prewrite
let start_ts = suite.alloc_ts();
let mut mutation = Mutation::default();
mutation.op = Op::Put;
mutation.key = k.clone().into_bytes();
mutation.value = v.clone().into_bytes();
suite.must_kv_prewrite(vec![mutation], k.clone().into_bytes(), start_ts);
// Commit
let commit_ts = suite.alloc_ts();
suite.must_kv_commit(vec![k.clone().into_bytes()], start_ts, commit_ts);
}
}
let backup_ts = suite.alloc_ts();
// key are order by lexicographical order, 'a'-'z' will cover all
let (admin_checksum, admin_total_kvs, admin_total_bytes) =
suite.admin_checksum(backup_ts, "a".to_owned(), "z".to_owned());

// Push down backup request.
let tmp = Builder::new().tempdir().unwrap();
let storage_path = format!(
"local://{}",
tmp.path().join(format!("{}", backup_ts)).display()
);
let rx = suite.backup(
vec![], // start
vec![], // end
backup_ts,
storage_path.clone(),
);
let resps1 = rx.collect().wait().unwrap();
// Only leader can handle backup.
assert_eq!(resps1.len(), 1);
let files: Vec<_> = resps1[0].files.clone().into_iter().collect();
// Short value is piggybacked in write cf, so we get 1 sst at least.
assert!(!files.is_empty());
let mut checksum = 0;
let mut total_kvs = 0;
let mut total_bytes = 0;
for f in files {
checksum ^= f.get_crc64xor();
total_kvs += f.get_total_kvs();
total_bytes += f.get_total_bytes();
}
assert_eq!(total_kvs, key_count);
assert_eq!(total_kvs, admin_total_kvs);
assert_eq!(total_bytes, admin_total_bytes);
assert_eq!(checksum, admin_checksum);
}
3 changes: 2 additions & 1 deletion components/tidb_query/src/expr/scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,8 @@ impl ScalarFunc {
| ScalarFuncSig::JsonValidJsonSig
| ScalarFuncSig::JsonContainsSig
| ScalarFuncSig::JsonKeys2ArgsSig
| ScalarFuncSig::JsonValidStringSig => return Err(Error::UnknownSignature(sig)),
| ScalarFuncSig::JsonValidStringSig
| ScalarFuncSig::JsonValidOthersSig => return Err(Error::UnknownSignature(sig)),
};
if args < min_args || args > max_args {
return Err(box_err!(
Expand Down
Loading