Skip to content

Commit

Permalink
feat: auto-migrate old index metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jan 31, 2025
1 parent d34fa95 commit fbfed9d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 15 deletions.
13 changes: 6 additions & 7 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4395,10 +4395,13 @@ mod tests {
let validate_res = dataset.validate().await;
assert!(validate_res.is_err());
assert_eq!(dataset.load_indices().await.unwrap()[0].name, "vector_idx");
assert!(dataset.index_statistics("vector_idx").await.is_err());

// Force a migration
dataset.delete("false").await.unwrap();
// Calling index statistics will force a migration
let stats = dataset.index_statistics("vector_idx").await.unwrap();
let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
assert_eq!(stats["num_indexed_fragments"], 2);

dataset.checkout_latest().await.unwrap();
dataset.validate().await.unwrap();

let indices = dataset.load_indices().await.unwrap();
Expand All @@ -4408,10 +4411,6 @@ mod tests {
}
assert_eq!(get_bitmap(&indices[0]), vec![0]);
assert_eq!(get_bitmap(&indices[1]), vec![1]);

let stats = dataset.index_statistics("vector_idx").await.unwrap();
let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
assert_eq!(stats["num_indexed_fragments"], 2);
}

#[rstest]
Expand Down
66 changes: 58 additions & 8 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//!
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use arrow_schema::DataType;
use async_trait::async_trait;
Expand Down Expand Up @@ -68,6 +68,24 @@ use self::append::merge_indices;
use self::scalar::build_scalar_index;
use self::vector::{build_vector_index, VectorIndexParams, LANCE_VECTOR_INDEX};

// Whether to auto-migrate a dataset when we encounter corruption.
fn auto_migrate_corruption() -> bool {
static MAX_CONN_RESET_RETRIES: OnceLock<bool> = OnceLock::new();
fn str_is_truthy(val: &str) -> bool {
val.eq_ignore_ascii_case("1")
| val.eq_ignore_ascii_case("true")
| val.eq_ignore_ascii_case("on")
| val.eq_ignore_ascii_case("yes")
| val.eq_ignore_ascii_case("y")
}
*MAX_CONN_RESET_RETRIES.get_or_init(|| {
std::env::var("LANCE_AUTO_MIGRATION")
.ok()
.map(|s| str_is_truthy(&s))
.unwrap_or(true)
})
}

/// Builds index.
#[async_trait]
pub trait IndexBuilder {
Expand Down Expand Up @@ -590,7 +608,8 @@ impl DatasetIndexExt for Dataset {
let index_type = indices[0].index_type().to_string();

let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?;
let num_indexed_rows_per_delta = indexed_fragments_per_delta

let res = indexed_fragments_per_delta
.iter()
.map(|frags| {
let mut sum = 0;
Expand All @@ -604,18 +623,49 @@ impl DatasetIndexExt for Dataset {
}
Ok(sum)
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>();

async fn migrate_and_recompute(ds: &Dataset, index_name: &str) -> Result<String> {
let mut ds = ds.clone();
log::warn!(
"Detecting out-dated fragment metadata, migrating dataset. \
To disable migration, set LANCE_AUTO_MIGRATION=false"
);
ds.delete("false").await.map_err(|err| {
Error::Execution {
message: format!("Failed to migrate dataset while calculating index statistics. \
To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", err),
location: location!(),
}
})?;
ds.index_statistics(index_name).await
}

let num_indexed_rows_per_delta = match res {
Ok(rows) => rows,
Err(Error::Internal { message, .. })
if auto_migrate_corruption() && message.contains("trigger a single write") =>
{
return migrate_and_recompute(self, index_name).await;
}
Err(e) => return Err(e),
};

let mut fragment_ids = HashSet::new();
for frags in indexed_fragments_per_delta.iter() {
for frag in frags.iter() {
if !fragment_ids.insert(frag.id) {
return Err(Error::Internal {
message: "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
if auto_migrate_corruption() {
return migrate_and_recompute(self, index_name).await;
} else {
return Err(Error::Internal {
message:
"Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
and trigger a single write to fix this"
.to_string(),
location: location!(),
});
.to_string(),
location: location!(),
});
}
}
}
}
Expand Down

0 comments on commit fbfed9d

Please sign in to comment.