Skip to content

Commit

Permalink
feat: use get_batch to implement get_range (apache#971)
Browse files Browse the repository at this point in the history
## Rationale
Now the implementation of `get_range` in `ObjectStore` based on `OBKV`
may cause extra IO operation, the better way is to let table_kv provide
an API `get_batch` to avoid this.

## Detailed Changes
* Add an API `get_batch` in table_kv
* use `get_batch` implement `get_range` in `ObjectStore` based on `OBKV`

## Test Plan
By unit tests
  • Loading branch information
MichaelLeeHZ authored and dust1 committed Aug 9, 2023
1 parent 5b2f9b7 commit ef3967f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 7 deletions.
40 changes: 33 additions & 7 deletions components/object_store/src/obkv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ pub enum Error {
timestamp: i64,
backtrace: Backtrace,
},

#[snafu(display(
"The length of data parts is inconsistent with the length of values, parts length:{part_len}, values length:{value_len} \nBacktrace:\n{backtrace}"
))]
DataPartsLength {
part_len: usize,
value_len: usize,
backtrace: Backtrace,
},
}

impl<T: TableKv> MetaManager<T> {
Expand Down Expand Up @@ -275,11 +284,11 @@ impl<T: TableKv> ObkvObjectStore<T> {
let table_name = self.pick_shard_table(location);
// TODO: Let table_kv provide a api `get_batch` to avoid extra IO operations.
let mut futures = FuturesOrdered::new();
for path in meta.parts {
for part_key in meta.parts {
let client = self.client.clone();
let table_name = table_name.to_string();
let future = async move {
match client.get(&table_name, path.as_bytes()) {
match client.get(&table_name, part_key.as_bytes()) {
Ok(res) => Ok(Bytes::from(res.unwrap())),
Err(err) => Err(StoreError::Generic {
store: OBKV,
Expand Down Expand Up @@ -459,16 +468,33 @@ impl<T: TableKv> ObjectStore for ObkvObjectStore<T> {
source,
})?;

for (index, key) in covered_parts.part_keys.iter().enumerate() {
let part_bytes = self
.client
.get(table_name, key.as_bytes())
let keys: Vec<&[u8]> = covered_parts
.part_keys
.iter()
.map(|key| key.as_bytes())
.collect();
let values =
self.client
.get_batch(table_name, keys)
.map_err(|source| StoreError::NotFound {
path: location.to_string(),
source: Box::new(source),
})?;

if let Some(bytes) = part_bytes {
if covered_parts.part_keys.len() != values.len() {
DataPartsLength {
part_len: covered_parts.part_keys.len(),
value_len: values.len(),
}
.fail()
.map_err(|source| StoreError::Generic {
store: OBKV,
source: Box::new(source),
})?
}

for (index, (key, value)) in covered_parts.part_keys.iter().zip(values).enumerate() {
if let Some(bytes) = value {
let mut begin = 0;
let mut end = bytes.len();
if index == 0 {
Expand Down
7 changes: 7 additions & 0 deletions components/table_kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ pub trait TableKv: Clone + Send + Sync + fmt::Debug + 'static {
/// Get value by key from table with `table_name`.
fn get(&self, table_name: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;

/// Get a batch of value by keys from table with `table_name`
fn get_batch(
&self,
table_name: &str,
keys: Vec<&[u8]>,
) -> Result<Vec<Option<Vec<u8>>>, Self::Error>;

/// Delete data by key from table with `table_name`.
fn delete(&self, table_name: &str, key: &[u8]) -> Result<(), Self::Error>;

Expand Down
17 changes: 17 additions & 0 deletions components/table_kv/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,23 @@ impl TableKv for MemoryImpl {
Ok(table.get(key))
}

fn get_batch(
&self,
table_name: &str,
keys: Vec<&[u8]>,
) -> std::result::Result<Vec<Option<Vec<u8>>>, Self::Error> {
let table = self
.find_table(table_name)
.context(TableNotFound { table_name })?;

let mut result = Vec::with_capacity(keys.len());
for key in keys {
result.push(table.get(key));
}

Ok(result)
}

fn delete(&self, table_name: &str, key: &[u8]) -> std::result::Result<(), Self::Error> {
let table = self
.find_table(table_name)
Expand Down
41 changes: 41 additions & 0 deletions components/table_kv/src/obkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,23 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Failed to get batch value from table, table:{table_name}, err:{source}.\nBacktrace:\n{backtrace}"
))]
GetBatchValue {
table_name: String,
source: obkv::error::Error,
backtrace: Backtrace,
},

#[snafu(display(
"Unexpected batch result found, table:{table_name}.\nBacktrace:\n{backtrace}"
))]
UnexpectedBatchResult {
table_name: String,
backtrace: Backtrace,
},

#[snafu(display(
"Failed to delete data from table, table:{}, err:{}.\nBacktrace:\n{}",
table_name,
Expand Down Expand Up @@ -501,6 +518,30 @@ impl TableKv for ObkvImpl {
Ok(values.remove(VALUE_COLUMN_NAME).map(Value::as_bytes))
}

fn get_batch(&self, table_name: &str, keys: Vec<&[u8]>) -> Result<Vec<Option<Vec<u8>>>> {
let mut batch_ops = ObTableBatchOperation::with_ops_num_raw(keys.len());
let mut batch_res = Vec::with_capacity(keys.len());

for key in keys {
batch_ops.get(bytes_to_values(key), vec![VALUE_COLUMN_NAME.to_string()]);
}

let result = self
.client
.execute_batch(table_name, batch_ops)
.context(GetBatchValue { table_name })?;

for table_ops_result in result {
match table_ops_result {
TableOpResult::RetrieveRows(mut values) => {
batch_res.push(values.remove(VALUE_COLUMN_NAME).map(Value::as_bytes))
}
TableOpResult::AffectedRows(_) => UnexpectedBatchResult { table_name }.fail()?,
}
}
Ok(batch_res)
}

fn delete(&self, table_name: &str, key: &[u8]) -> std::result::Result<(), Self::Error> {
self.client
.delete(table_name, bytes_to_values(key))
Expand Down

0 comments on commit ef3967f

Please sign in to comment.