Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate flat index #448

Merged
merged 28 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions rust/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ pub trait RecordBatchExt {
///
/// TODO: add merge nested fields support.
fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;

/// Drop one column specified with the name and return the new [`RecordBatch`].
///
/// If the named column does not exist, it returns a copy of this [`RecordBatch`].
fn drop_column(&self, name: &str) -> Result<RecordBatch>;
}

impl RecordBatchExt for RecordBatch {
Expand Down Expand Up @@ -346,4 +351,22 @@ impl RecordBatchExt for RecordBatch {
}
Ok(Self::try_new(Arc::new(Schema::new(fields)), columns)?)
}

fn drop_column(&self, name: &str) -> Result<RecordBatch> {
let mut fields = vec![];
let mut columns = vec![];
for i in 0..self.schema().fields.len() {
if self.schema().field(i).name() != name {
fields.push(self.schema().field(i).clone());
columns.push(self.column(i).clone());
}
}
Ok(RecordBatch::try_new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we create drop_field in Schema?

Arc::new(Schema::new_with_metadata(
fields,
self.schema().metadata().clone(),
)),
columns,
)?)
}
}
1 change: 1 addition & 0 deletions rust/src/arrow/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow_select::concat::concat_batches;

use crate::Result;

#[derive(Debug)]
pub struct RecordBatchBuffer {
pub batches: Vec<RecordBatch>,
idx: usize,
Expand Down
31 changes: 24 additions & 7 deletions rust/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_array::cast::as_struct_array;
use arrow_array::{RecordBatch, RecordBatchReader, StructArray, UInt64Array};
use arrow_schema::Schema as ArrowSchema;
use arrow_select::concat::concat_batches;
use arrow_select::{concat::concat_batches, take::take};
use chrono::prelude::*;
use futures::stream::{self, StreamExt, TryStreamExt};
use object_store::path::Path;
Expand Down Expand Up @@ -34,7 +35,7 @@ fn latest_manifest_path(base: &Path) -> Path {
}

/// Lance Dataset
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Dataset {
object_store: Arc<ObjectStore>,
base: Path,
Expand Down Expand Up @@ -208,7 +209,7 @@ impl Dataset {

/// Create a Scanner to scan the dataset.
pub fn scan(&self) -> Scanner {
Scanner::new(&self)
Scanner::new(Arc::new(self.clone()))
}

/// Take rows by the internal ROW ids.
Expand All @@ -217,8 +218,11 @@ impl Dataset {
row_ids: &[u64],
projection: &Schema,
) -> Result<RecordBatch> {
let mut sorted_row_ids = Vec::from(row_ids);
sorted_row_ids.sort();

let mut row_ids_per_fragment: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
row_ids.iter().for_each(|row_id| {
sorted_row_ids.iter().for_each(|row_id| {
let fragment_id = row_id >> 32;
let offset = (row_id - (fragment_id << 32)) as u32;
row_ids_per_fragment
Expand All @@ -232,7 +236,7 @@ impl Dataset {
let batches = stream::iter(self.fragments().as_ref())
.filter(|f| async { row_ids_per_fragment.contains_key(&f.id) })
.then(|fragment| async {
let path = Path::from(fragment.files[0].path.as_str());
let path = self.data_dir().child(fragment.files[0].path.as_str());
let reader = FileReader::try_new_with_fragment(
object_store,
&path,
Expand All @@ -248,7 +252,20 @@ impl Dataset {
})
.try_collect::<Vec<_>>()
.await?;
Ok(concat_batches(&schema, &batches)?)
let one_batch = concat_batches(&schema, &batches)?;

let original_indices: UInt64Array = row_ids
.iter()
.map(|o| {
sorted_row_ids
.iter()
.position(|sorted_id| sorted_id == o)
.unwrap() as u64
})
.collect();
let struct_arr: StructArray = one_batch.into();
let reordered = take(&struct_arr, &original_indices, None)?;
Ok(as_struct_array(&reordered).into())
}

fn versions_dir(&self) -> Path {
Expand Down
85 changes: 45 additions & 40 deletions rust/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ use std::task::{Context, Poll};

use arrow_array::{Float32Array, RecordBatch};
use arrow_schema::{Schema as ArrowSchema, SchemaRef};
use futures::stream::Stream;
use futures::StreamExt;
use object_store::path::Path;
use futures::stream::{Stream, StreamExt};

use super::Dataset;
use crate::datatypes::Schema;
use crate::format::{Fragment, Manifest};
use crate::format::Fragment;
use crate::index::vector::Query;
use crate::io::exec::{ExecNode, Scan};
use crate::io::ObjectStore;

use crate::io::exec::{ExecNode, KNNFlat, Scan, Take};
use crate::Result;

/// Dataset Scanner
Expand All @@ -46,8 +44,8 @@ use crate::Result;
/// .buffered(16)
/// .sum()
/// ```
pub struct Scanner<'a> {
dataset: &'a Dataset,
pub struct Scanner {
dataset: Arc<Dataset>,

projections: Schema,

Expand All @@ -63,14 +61,16 @@ pub struct Scanner<'a> {
with_row_id: bool,
}

impl<'a> Scanner<'a> {
pub fn new(dataset: &'a Dataset) -> Self {
impl<'a> Scanner {
pub fn new(dataset: Arc<Dataset>) -> Self {
let projection = dataset.schema().clone();
let fragments = dataset.fragments().clone();
Self {
dataset,
projections: dataset.schema().clone(),
projections: projection,
limit: None,
offset: None,
fragments: dataset.fragments().clone(),
fragments,
nearest: None,
with_row_id: false,
}
Expand Down Expand Up @@ -129,44 +129,49 @@ impl<'a> Scanner<'a> {
let with_row_id = self.with_row_id;
let projection = &self.projections;

ScannerStream::new(
self.dataset.object_store.clone(),
data_dir,
self.fragments.clone(),
manifest,
PREFECTH_SIZE,
projection,
with_row_id,
)
let exec_node: Box<dyn ExecNode + Unpin + Send> = if let Some(q) = self.nearest.as_ref() {
let vector_scan_projection =
Arc::new(self.dataset.schema().project(&[&q.column]).unwrap());
let scan_node = Scan::new(
self.dataset.object_store.clone(),
data_dir.clone(),
self.fragments.clone(),
&vector_scan_projection,
manifest.clone(),
PREFECTH_SIZE,
true,
);
let flat_knn_node = KNNFlat::new(scan_node, q);
Box::new(Take::new(
self.dataset.clone(),
Arc::new(projection.clone()),
flat_knn_node,
))
} else {
Box::new(Scan::new(
self.dataset.object_store.clone(),
data_dir.clone(),
self.fragments.clone(),
projection,
manifest.clone(),
PREFECTH_SIZE,
with_row_id,
))
};

ScannerStream::new(exec_node)
}
}

/// ScannerStream is a container to wrap different types of ExecNode.
#[pin_project::pin_project]
pub struct ScannerStream {
#[pin]
exec_node: Box<dyn ExecNode + Unpin + Send>,
}

impl ScannerStream {
fn new<'a>(
object_store: Arc<ObjectStore>,
data_dir: Path,
fragments: Arc<Vec<Fragment>>,
manifest: Arc<Manifest>,
prefetch_size: usize,
schema: &Schema,
with_row_id: bool,
) -> Self {
let exec_node = Box::new(Scan::new(
object_store,
data_dir,
fragments.clone(),
schema,
manifest.clone(),
prefetch_size,
with_row_id,
));

fn new<'a>(exec_node: Box<dyn ExecNode + Unpin + Send>) -> Self {
Self { exec_node }
}
}
Expand Down
33 changes: 31 additions & 2 deletions rust/src/format/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl From<pb::Metadata> for Metadata {
}
}

#[derive(Debug, PartialEq)]
pub(crate) struct BatchOffsets {
pub batch_id: i32,
pub offsets: Vec<u32>,
Expand Down Expand Up @@ -97,6 +98,10 @@ impl Metadata {
let mut batch_id: i32 = 0;
let num_batches = self.num_batches() as i32;
let mut indices_per_batch: BTreeMap<i32, Vec<u32>> = BTreeMap::new();

let mut indices = Vec::from(indices);
indices.sort();

for idx in indices.iter() {
while batch_id < num_batches && *idx >= self.batch_offsets[batch_id as usize + 1] as u32
{
Expand All @@ -112,7 +117,6 @@ impl Metadata {
.iter()
.map(|(batch_id, indices)| {
let batch_offset = self.batch_offsets[*batch_id as usize];

// Adjust indices to be the in-batch offsets.
let in_batch_offsets = indices
.iter()
Expand All @@ -128,4 +132,29 @@ impl Metadata {
}

#[cfg(test)]
mod tests {}
mod tests {
use super::*;

#[test]
fn test_group_indices_to_batch() {
let mut metadata = Metadata::default();
metadata.push_batch_length(20);
metadata.push_batch_length(20);

let batches = metadata.group_indices_to_batches(&[6, 24]);
assert_eq!(batches.len(), 2);
assert_eq!(
batches,
vec![
BatchOffsets {
batch_id: 0,
offsets: vec![6]
},
BatchOffsets {
batch_id: 1,
offsets: vec![4]
}
]
);
}
}
1 change: 1 addition & 0 deletions rust/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod flat;
use crate::Result;

/// Query parameters for the vector indices
#[derive(Debug, Clone)]
pub struct Query {
pub column: String,
/// The vector to be searched.
Expand Down
Loading