Skip to content

Commit

Permalink
Integrate flat index (#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Jan 23, 2023
1 parent 1b057a6 commit fcdd1fc
Show file tree
Hide file tree
Showing 11 changed files with 454 additions and 98 deletions.
23 changes: 23 additions & 0 deletions rust/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ pub trait RecordBatchExt {
///
/// TODO: add merge nested fields support.
fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;

/// Drop one column specified with the name and return the new [`RecordBatch`].
///
/// If the named column does not exist, it returns a copy of this [`RecordBatch`].
fn drop_column(&self, name: &str) -> Result<RecordBatch>;
}

impl RecordBatchExt for RecordBatch {
Expand Down Expand Up @@ -346,4 +351,22 @@ impl RecordBatchExt for RecordBatch {
}
Ok(Self::try_new(Arc::new(Schema::new(fields)), columns)?)
}

fn drop_column(&self, name: &str) -> Result<RecordBatch> {
let mut fields = vec![];
let mut columns = vec![];
for i in 0..self.schema().fields.len() {
if self.schema().field(i).name() != name {
fields.push(self.schema().field(i).clone());
columns.push(self.column(i).clone());
}
}
Ok(RecordBatch::try_new(
Arc::new(Schema::new_with_metadata(
fields,
self.schema().metadata().clone(),
)),
columns,
)?)
}
}
1 change: 1 addition & 0 deletions rust/src/arrow/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow_select::concat::concat_batches;

use crate::Result;

#[derive(Debug)]
pub struct RecordBatchBuffer {
pub batches: Vec<RecordBatch>,
idx: usize,
Expand Down
31 changes: 24 additions & 7 deletions rust/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_array::cast::as_struct_array;
use arrow_array::{RecordBatch, RecordBatchReader, StructArray, UInt64Array};
use arrow_schema::Schema as ArrowSchema;
use arrow_select::concat::concat_batches;
use arrow_select::{concat::concat_batches, take::take};
use chrono::prelude::*;
use futures::stream::{self, StreamExt, TryStreamExt};
use object_store::path::Path;
Expand Down Expand Up @@ -34,7 +35,7 @@ fn latest_manifest_path(base: &Path) -> Path {
}

/// Lance Dataset
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Dataset {
object_store: Arc<ObjectStore>,
base: Path,
Expand Down Expand Up @@ -208,7 +209,7 @@ impl Dataset {

/// Create a Scanner to scan the dataset.
pub fn scan(&self) -> Scanner {
Scanner::new(&self)
Scanner::new(Arc::new(self.clone()))
}

/// Take rows by the internal ROW ids.
Expand All @@ -217,8 +218,11 @@ impl Dataset {
row_ids: &[u64],
projection: &Schema,
) -> Result<RecordBatch> {
let mut sorted_row_ids = Vec::from(row_ids);
sorted_row_ids.sort();

let mut row_ids_per_fragment: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
row_ids.iter().for_each(|row_id| {
sorted_row_ids.iter().for_each(|row_id| {
let fragment_id = row_id >> 32;
let offset = (row_id - (fragment_id << 32)) as u32;
row_ids_per_fragment
Expand All @@ -232,7 +236,7 @@ impl Dataset {
let batches = stream::iter(self.fragments().as_ref())
.filter(|f| async { row_ids_per_fragment.contains_key(&f.id) })
.then(|fragment| async {
let path = Path::from(fragment.files[0].path.as_str());
let path = self.data_dir().child(fragment.files[0].path.as_str());
let reader = FileReader::try_new_with_fragment(
object_store,
&path,
Expand All @@ -248,7 +252,20 @@ impl Dataset {
})
.try_collect::<Vec<_>>()
.await?;
Ok(concat_batches(&schema, &batches)?)
let one_batch = concat_batches(&schema, &batches)?;

let original_indices: UInt64Array = row_ids
.iter()
.map(|o| {
sorted_row_ids
.iter()
.position(|sorted_id| sorted_id == o)
.unwrap() as u64
})
.collect();
let struct_arr: StructArray = one_batch.into();
let reordered = take(&struct_arr, &original_indices, None)?;
Ok(as_struct_array(&reordered).into())
}

fn versions_dir(&self) -> Path {
Expand Down
85 changes: 45 additions & 40 deletions rust/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ use std::task::{Context, Poll};

use arrow_array::{Float32Array, RecordBatch};
use arrow_schema::{Schema as ArrowSchema, SchemaRef};
use futures::stream::Stream;
use futures::StreamExt;
use object_store::path::Path;
use futures::stream::{Stream, StreamExt};

use super::Dataset;
use crate::datatypes::Schema;
use crate::format::{Fragment, Manifest};
use crate::format::Fragment;
use crate::index::vector::Query;
use crate::io::exec::{ExecNode, Scan};
use crate::io::ObjectStore;

use crate::io::exec::{ExecNode, KNNFlat, Scan, Take};
use crate::Result;

/// Dataset Scanner
Expand All @@ -46,8 +44,8 @@ use crate::Result;
/// .buffered(16)
/// .sum()
/// ```
pub struct Scanner<'a> {
dataset: &'a Dataset,
pub struct Scanner {
dataset: Arc<Dataset>,

projections: Schema,

Expand All @@ -63,14 +61,16 @@ pub struct Scanner<'a> {
with_row_id: bool,
}

impl<'a> Scanner<'a> {
pub fn new(dataset: &'a Dataset) -> Self {
impl<'a> Scanner {
pub fn new(dataset: Arc<Dataset>) -> Self {
let projection = dataset.schema().clone();
let fragments = dataset.fragments().clone();
Self {
dataset,
projections: dataset.schema().clone(),
projections: projection,
limit: None,
offset: None,
fragments: dataset.fragments().clone(),
fragments,
nearest: None,
with_row_id: false,
}
Expand Down Expand Up @@ -129,44 +129,49 @@ impl<'a> Scanner<'a> {
let with_row_id = self.with_row_id;
let projection = &self.projections;

ScannerStream::new(
self.dataset.object_store.clone(),
data_dir,
self.fragments.clone(),
manifest,
PREFECTH_SIZE,
projection,
with_row_id,
)
let exec_node: Box<dyn ExecNode + Unpin + Send> = if let Some(q) = self.nearest.as_ref() {
let vector_scan_projection =
Arc::new(self.dataset.schema().project(&[&q.column]).unwrap());
let scan_node = Scan::new(
self.dataset.object_store.clone(),
data_dir.clone(),
self.fragments.clone(),
&vector_scan_projection,
manifest.clone(),
PREFECTH_SIZE,
true,
);
let flat_knn_node = KNNFlat::new(scan_node, q);
Box::new(Take::new(
self.dataset.clone(),
Arc::new(projection.clone()),
flat_knn_node,
))
} else {
Box::new(Scan::new(
self.dataset.object_store.clone(),
data_dir.clone(),
self.fragments.clone(),
projection,
manifest.clone(),
PREFECTH_SIZE,
with_row_id,
))
};

ScannerStream::new(exec_node)
}
}

/// ScannerStream is a container to wrap different types of ExecNode.
#[pin_project::pin_project]
pub struct ScannerStream {
#[pin]
exec_node: Box<dyn ExecNode + Unpin + Send>,
}

impl ScannerStream {
fn new<'a>(
object_store: Arc<ObjectStore>,
data_dir: Path,
fragments: Arc<Vec<Fragment>>,
manifest: Arc<Manifest>,
prefetch_size: usize,
schema: &Schema,
with_row_id: bool,
) -> Self {
let exec_node = Box::new(Scan::new(
object_store,
data_dir,
fragments.clone(),
schema,
manifest.clone(),
prefetch_size,
with_row_id,
));

fn new<'a>(exec_node: Box<dyn ExecNode + Unpin + Send>) -> Self {
Self { exec_node }
}
}
Expand Down
33 changes: 31 additions & 2 deletions rust/src/format/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl From<pb::Metadata> for Metadata {
}
}

#[derive(Debug, PartialEq)]
pub(crate) struct BatchOffsets {
pub batch_id: i32,
pub offsets: Vec<u32>,
Expand Down Expand Up @@ -97,6 +98,10 @@ impl Metadata {
let mut batch_id: i32 = 0;
let num_batches = self.num_batches() as i32;
let mut indices_per_batch: BTreeMap<i32, Vec<u32>> = BTreeMap::new();

let mut indices = Vec::from(indices);
indices.sort();

for idx in indices.iter() {
while batch_id < num_batches && *idx >= self.batch_offsets[batch_id as usize + 1] as u32
{
Expand All @@ -112,7 +117,6 @@ impl Metadata {
.iter()
.map(|(batch_id, indices)| {
let batch_offset = self.batch_offsets[*batch_id as usize];

// Adjust indices to be the in-batch offsets.
let in_batch_offsets = indices
.iter()
Expand All @@ -128,4 +132,29 @@ impl Metadata {
}

#[cfg(test)]
mod tests {}
mod tests {
use super::*;

#[test]
fn test_group_indices_to_batch() {
let mut metadata = Metadata::default();
metadata.push_batch_length(20);
metadata.push_batch_length(20);

let batches = metadata.group_indices_to_batches(&[6, 24]);
assert_eq!(batches.len(), 2);
assert_eq!(
batches,
vec![
BatchOffsets {
batch_id: 0,
offsets: vec![6]
},
BatchOffsets {
batch_id: 1,
offsets: vec![4]
}
]
);
}
}
1 change: 1 addition & 0 deletions rust/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod flat;
use crate::Result;

/// Query parameters for the vector indices
#[derive(Debug, Clone)]
pub struct Query {
pub column: String,
/// The vector to be searched.
Expand Down
Loading

0 comments on commit fcdd1fc

Please sign in to comment.