Skip to content

Commit

Permalink
[CHORE] Cleanup clippy warnings in rust/blockstore (#2859)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
- Clean up warnings under rust/types.

## Test plan
*How are these changes tested?*
- `cargo clippy --all-targets`

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
rescrv authored Sep 26, 2024
1 parent 3e97018 commit 620f143
Show file tree
Hide file tree
Showing 29 changed files with 369 additions and 403 deletions.
14 changes: 6 additions & 8 deletions rust/blockstore/src/arrow/block/delta/data_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl DataRecordStorage {
// https://docs.rs/arrow-buffer/52.2.0/src/arrow_buffer/buffer/null.rs.html#153-155
let validity_bytes = bit_util::round_upto_multiple_of_64(bit_util::ceil(self.len(), 8)) * 2;

let total_size = prefix_size
prefix_size
+ key_size
+ id_size
+ embedding_size
Expand All @@ -223,9 +223,7 @@ impl DataRecordStorage {
+ id_offset
+ metdata_offset
+ document_offset
+ validity_bytes;

total_size
+ validity_bytes
}

fn split_internal<K: ArrowWriteableKey>(&self, split_size: usize) -> SplitInformation {
Expand Down Expand Up @@ -295,15 +293,15 @@ impl DataRecordStorage {
}
}

return SplitInformation {
SplitInformation {
split_key: split_key.expect("split key should be set"),
remaining_prefix_size: inner.prefix_size - prefix_size,
remaining_key_size: inner.key_size - key_size,
remaining_id_size: inner.id_size - id_size,
remaining_embedding_size: inner.embedding_size - embedding_size,
remaining_metadata_size: inner.metadata_size - metadata_size,
remaining_document_size: inner.document_size - document_size,
};
}
}

pub(super) fn split<K: ArrowWriteableKey>(
Expand Down Expand Up @@ -340,7 +338,7 @@ impl DataRecordStorage {
inner.storage.len()
}

pub(super) fn to_arrow(
pub(super) fn into_arrow(
self,
key_builder: BlockKeyArrowBuilder,
) -> Result<RecordBatch, arrow::error::ArrowError> {
Expand Down Expand Up @@ -396,7 +394,7 @@ impl DataRecordStorage {
}
}
// Build arrow key with fields.
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.to_arrow();
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.as_arrow();

let id_field = Field::new("id", arrow::datatypes::DataType::Utf8, true);
let embedding_field = Field::new(
Expand Down
53 changes: 35 additions & 18 deletions rust/blockstore/src/arrow/block/delta/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ impl BlockDelta {
/// Creates a new block delta from a block.
/// # Arguments
/// - id: the id of the block delta.
// NOTE(rescrv): K is unused, but it is very conceptually easy to think of everything as
// key-value pairs. I started to refactor this to remove ArrowWriteableKey, but it was not
// readable to tell whether I was operating on the key or value type. Keeping both but
// suppressing the clippy error is a reasonable alternative.
#[allow(clippy::extra_unused_type_parameters)]
pub fn new<K: ArrowWriteableKey, V: ArrowWriteableValue>(id: Uuid) -> Self {
BlockDelta {
builder: V::get_delta_builder(),
Expand Down Expand Up @@ -56,12 +61,14 @@ impl BlockDelta {
/// where applicable. The size is rounded up to the nearest 64 bytes as per
/// the arrow specification. When a block delta is converted into a block data
/// the same sizing is used to allocate the memory for the block data.
#[allow(clippy::extra_unused_type_parameters)]
pub(in crate::arrow) fn get_size<K: ArrowWriteableKey, V: ArrowWriteableValue>(&self) -> usize {
self.builder.get_size::<K>()
}

#[allow(clippy::extra_unused_type_parameters)]
pub fn finish<K: ArrowWriteableKey, V: ArrowWriteableValue>(self) -> RecordBatch {
self.builder.to_record_batch::<K>()
self.builder.into_record_batch::<K>()
}

/// Splits the block delta into two block deltas. The split point is the last key
Expand All @@ -72,8 +79,8 @@ impl BlockDelta {
/// A tuple containing the the key of the split point and the new block delta.
/// The new block deltas contains all the key value pairs after, but not including the
/// split point.
pub(crate) fn split<'referred_data, K: ArrowWriteableKey, V: ArrowWriteableValue>(
&'referred_data self,
pub(crate) fn split<K: ArrowWriteableKey, V: ArrowWriteableValue>(
&self,
max_block_size_bytes: usize,
) -> Vec<(CompositeKey, BlockDelta)> {
let half_size = max_block_size_bytes / 2;
Expand All @@ -83,8 +90,7 @@ impl BlockDelta {
let mut output = Vec::new();
let mut first_iter: bool = true;
// iterate over all blocks to split until its empty
while !blocks_to_split.is_empty() {
let curr_block = blocks_to_split.pop().unwrap();
while let Some(curr_block) = blocks_to_split.pop() {
let (new_start_key, new_delta) = curr_block.builder.split::<K>(half_size);
let new_block = BlockDelta {
builder: new_delta,
Expand All @@ -110,7 +116,7 @@ impl BlockDelta {
}
}

return output;
output
}

pub(crate) fn len(&self) -> usize {
Expand Down Expand Up @@ -178,6 +184,7 @@ mod test {
}
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&block.clone().id).await.unwrap().unwrap();
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, &[u32]>("prefix", &key).unwrap();
Expand All @@ -195,9 +202,10 @@ mod test {
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<&str, String>();
let delta_id = delta.id.clone();
let delta_id = delta.id;

let n = 2000;
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let prefix = "prefix";
let key = format!("key{}", i);
Expand All @@ -207,6 +215,7 @@ mod test {
let size = delta.get_size::<&str, String>();
let block = block_manager.commit::<&str, String>(delta);
let mut values_before_flush = vec![];
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, &str>("prefix", &key);
Expand All @@ -217,6 +226,7 @@ mod test {
let block = block_manager.get(&delta_id).await.unwrap().unwrap();

assert_eq!(size, block.get_size());
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, &str>("prefix", &key);
Expand All @@ -225,6 +235,7 @@ mod test {

// test save/load
let loaded = test_save_load_size(path, &block);
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let key = format!("key{}", i);
let read = loaded.get::<&str, &str>("prefix", &key);
Expand All @@ -233,7 +244,7 @@ mod test {

// test fork
let forked_block = block_manager.fork::<&str, String>(&delta_id).await.unwrap();
let new_id = forked_block.id.clone();
let new_id = forked_block.id;
let block = block_manager.commit::<&str, String>(forked_block);
block_manager.flush(&block).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap().unwrap();
Expand Down Expand Up @@ -262,7 +273,7 @@ mod test {
}

let size = delta.get_size::<f32, String>();
let delta_id = delta.id.clone();
let delta_id = delta.id;
let block = block_manager.commit::<f32, String>(delta);
let mut values_before_flush = vec![];
for i in 0..n {
Expand All @@ -273,6 +284,7 @@ mod test {
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap().unwrap();
assert_eq!(size, block.get_size());
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let key = i as f32;
let read = block.get::<f32, &str>("prefix", key).unwrap();
Expand Down Expand Up @@ -300,7 +312,7 @@ mod test {
}

let size = delta.get_size::<&str, RoaringBitmap>();
let delta_id = delta.id.clone();
let delta_id = delta.id;
let block = block_manager.commit::<&str, RoaringBitmap>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap().unwrap();
Expand All @@ -325,17 +337,17 @@ mod test {
let storage = Storage::Local(LocalStorage::new(path));
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let ids = vec!["embedding_id_2", "embedding_id_0", "embedding_id_1"];
let embeddings = vec![
let ids = ["embedding_id_2", "embedding_id_0", "embedding_id_1"];
let embeddings = [
vec![1.0, 2.0, 3.0],
vec![4.0, 5.0, 6.0],
vec![7.0, 8.0, 9.0],
];
let mut metadata = HashMap::new();
metadata.insert("key1".to_string(), MetadataValue::Str("value1".to_string()));
let metadata = Some(metadata);
let metadatas = vec![None, metadata.clone(), None];
let documents = vec![None, Some("test document"), None];
let metadatas = [None, metadata.clone(), None];
let documents = [None, Some("test document"), None];
let delta = block_manager.create::<&str, &DataRecord>();

//TODO: Option<&T> as opposed to &Option<T>
Expand Down Expand Up @@ -365,7 +377,7 @@ mod test {
}

let size = delta.get_size::<&str, &DataRecord>();
let delta_id = delta.id.clone();
let delta_id = delta.id;
let block = block_manager.commit::<&str, &DataRecord>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap().unwrap();
Expand Down Expand Up @@ -400,7 +412,7 @@ mod test {
}

let size = delta.get_size::<u32, String>();
let delta_id = delta.id.clone();
let delta_id = delta.id;
let block = block_manager.commit::<u32, String>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap().unwrap();
Expand All @@ -418,9 +430,10 @@ mod test {
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<u32, u32>();
let delta_id = delta.id.clone();
let delta_id = delta.id;

let n = 2000;
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let prefix = "prefix";
let key = i as u32;
Expand All @@ -430,6 +443,7 @@ mod test {
let size = delta.get_size::<u32, u32>();
let block = block_manager.commit::<u32, u32>(delta);
let mut values_before_flush = vec![];
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let key = i as u32;
let read = block.get::<u32, u32>("prefix", key);
Expand All @@ -440,6 +454,7 @@ mod test {
let block = block_manager.get(&delta_id).await.unwrap().unwrap();

assert_eq!(size, block.get_size());
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let key = i as u32;
let read = block.get::<u32, u32>("prefix", key);
Expand All @@ -448,6 +463,7 @@ mod test {

// test save/load
let loaded = test_save_load_size(path, &block);
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let key = i as u32;
let read = loaded.get::<u32, u32>("prefix", key);
Expand All @@ -456,10 +472,11 @@ mod test {

// test fork
let forked_block = block_manager.fork::<u32, u32>(&delta_id).await.unwrap();
let new_id = forked_block.id.clone();
let new_id = forked_block.id;
let block = block_manager.commit::<u32, u32>(forked_block);
block_manager.flush(&block).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap().unwrap();
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let key = i as u32;
let read = forked_block.get::<u32, u32>("prefix", key);
Expand Down
1 change: 1 addition & 0 deletions rust/blockstore/src/arrow/block/delta/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(super) mod data_record;
#[allow(clippy::module_inception)]
mod delta;
pub(super) mod single_column_size_tracker;
pub(super) mod single_column_storage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl SingleColumnSizeTracker {

/// The raw unpadded size of the prefix data in bytes.
pub(super) fn get_prefix_size(&self) -> usize {
return self.prefix_size;
self.prefix_size
}

/// The arrow padded size of the prefix data in bytes.
Expand All @@ -44,7 +44,7 @@ impl SingleColumnSizeTracker {

/// The raw unpadded size of the key data in bytes.
pub(super) fn get_key_size(&self) -> usize {
return self.key_size;
self.key_size
}

/// The arrow padded size of the key data in bytes.
Expand All @@ -56,7 +56,7 @@ impl SingleColumnSizeTracker {

/// The raw unpadded size of the value data in bytes.
pub(super) fn get_value_size(&self) -> usize {
return self.value_size;
self.value_size
}

/// The arrow padded size of the value data in bytes.
Expand Down
16 changes: 8 additions & 8 deletions rust/blockstore/src/arrow/block/delta/single_column_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl<T: ArrowWriteableValue> SingleColumnStorage<T> {
}

impl SingleColumnStorage<String> {
pub(super) fn to_arrow(
pub(super) fn into_arrow(
self,
key_builder: BlockKeyArrowBuilder,
) -> Result<RecordBatch, arrow::error::ArrowError> {
Expand All @@ -245,7 +245,7 @@ impl SingleColumnStorage<String> {
}
}
// Build arrow key with fields.
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.to_arrow();
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.as_arrow();
// Build arrow value with fields.
let value_field = Field::new("value", arrow::datatypes::DataType::Utf8, false);
let value_arr = value_builder.finish();
Expand All @@ -260,7 +260,7 @@ impl SingleColumnStorage<String> {
}

impl SingleColumnStorage<Vec<u32>> {
pub(super) fn to_arrow(
pub(super) fn into_arrow(
self,
key_builder: BlockKeyArrowBuilder,
) -> Result<RecordBatch, arrow::error::ArrowError> {
Expand Down Expand Up @@ -290,7 +290,7 @@ impl SingleColumnStorage<Vec<u32>> {
}
}
// Build arrow key and value with fields.
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.to_arrow();
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.as_arrow();

let value_field = Field::new(
"value",
Expand All @@ -313,7 +313,7 @@ impl SingleColumnStorage<Vec<u32>> {
}

impl SingleColumnStorage<u32> {
pub(super) fn to_arrow(
pub(super) fn into_arrow(
self,
key_builder: BlockKeyArrowBuilder,
) -> Result<RecordBatch, arrow::error::ArrowError> {
Expand All @@ -339,7 +339,7 @@ impl SingleColumnStorage<u32> {
}
}
// Build arrow key with fields.
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.to_arrow();
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.as_arrow();
let value_field = Field::new("value", arrow::datatypes::DataType::UInt32, false);
let value_arr = value_builder.finish();
let value_arr = (&value_arr as &dyn Array).slice(0, value_arr.len());
Expand All @@ -353,7 +353,7 @@ impl SingleColumnStorage<u32> {
}

impl SingleColumnStorage<RoaringBitmap> {
pub(super) fn to_arrow(
pub(super) fn into_arrow(
self,
key_builder: BlockKeyArrowBuilder,
) -> Result<RecordBatch, arrow::error::ArrowError> {
Expand Down Expand Up @@ -389,7 +389,7 @@ impl SingleColumnStorage<RoaringBitmap> {
}
}
// Build arrow key with fields.
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.to_arrow();
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.as_arrow();

let value_field = Field::new("value", arrow::datatypes::DataType::Binary, true);
let value_arr = value_builder.finish();
Expand Down
Loading

0 comments on commit 620f143

Please sign in to comment.