This repository has been archived by the owner on Apr 4, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 81
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
324: Implement documents API r=Kerollmops a=MarinPostma This pr implement the intermediary document representation for milli. The JSON, JSONL and CSV formats are replaced with the format instead, to push the serialization duty on the client side. The `documents` module contains the interface to the new document format: - The `DocumentsBuilder` allows the creation of a writer backed document addition, when documents are added either one by one, or as arrays of depth 1. This is made possible by the fact that the seriliazer used by the `add_documents` methods only accepts `[Object]` and `Object`. The related serialization logic is located in the `serde.rs` file. - The `DocumentsReader` allows to to iterate over the documents created by a `DocumentsBuilder`. A call to `next_document_with_index` returns the next obkv reader in the document addition, along with a reference to the index used to map the field ids in the obkv reader to the field names All references to json, jsonl or csv in the tests have been replaced with the `documents!` macro, works exaclty like the `serde_json::json` macro, as a convenient way to create a `DocumentsReader`. Rewrote the search cli, to the `cli` crate, to also allow index manipulation. This only offers basic functionalities for now, but is meant to be easier to extend than http ui blocked by #308 Co-authored-by: mpostma <postma.marin@protonmail.com>
- Loading branch information
Showing
25 changed files
with
5,118 additions
and
717 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
[package] | ||
name = "cli" | ||
version = "0.1.0" | ||
edition = "2018" | ||
description = "A CLI to interact with a milli index" | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
indicatif = "0.16.2" | ||
serde = "1.0.129" | ||
serde_json = "1.0.66" | ||
structopt = "0.3.22" | ||
milli = { path = "../milli" } | ||
eyre = "0.6.5" | ||
color-eyre = "0.5.11" | ||
heed = { git = "https://github.com/Kerollmops/heed", tag = "v0.12.1", default-features = false, features = ["lmdb", "sync-read-txn"] } | ||
byte-unit = { version = "4.0.12", features = ["serde"] } | ||
bimap = "0.6.1" | ||
csv = "1.1.6" | ||
stderrlog = "0.5.1" | ||
|
||
[target.'cfg(target_os = "linux")'.dependencies] | ||
jemallocator = "0.3.2" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,335 @@ | ||
use std::fs::File; | ||
use std::io::{stdin, Cursor, Read}; | ||
use std::path::PathBuf; | ||
use std::str::FromStr; | ||
|
||
use byte_unit::Byte; | ||
use eyre::Result; | ||
use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; | ||
use milli::update::UpdateIndexingStep::{ | ||
ComputeIdsAndMergeDocuments, IndexDocuments, MergeDataIntoFinalDatabase, RemapDocumentAddition, | ||
}; | ||
use serde_json::{Map, Value}; | ||
use structopt::StructOpt; | ||
|
||
#[cfg(target_os = "linux")] | ||
#[global_allocator] | ||
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; | ||
|
||
#[derive(Debug, StructOpt)] | ||
#[structopt(name = "Milli CLI", about = "A simple CLI to manipulate a milli index.")] | ||
struct Cli { | ||
#[structopt(short, long)] | ||
index_path: PathBuf, | ||
#[structopt(short = "s", long, default_value = "100GiB")] | ||
index_size: Byte, | ||
/// Verbose mode (-v, -vv, -vvv, etc.) | ||
#[structopt(short, long, parse(from_occurrences))] | ||
verbose: usize, | ||
#[structopt(subcommand)] | ||
subcommand: Command, | ||
} | ||
|
||
#[derive(Debug, StructOpt)] | ||
enum Command { | ||
DocumentAddition(DocumentAddition), | ||
Search(Search), | ||
SettingsUpdate(SettingsUpdate), | ||
} | ||
|
||
fn setup(opt: &Cli) -> Result<()> { | ||
color_eyre::install()?; | ||
stderrlog::new() | ||
.verbosity(opt.verbose) | ||
.show_level(false) | ||
.timestamp(stderrlog::Timestamp::Off) | ||
.init()?; | ||
Ok(()) | ||
} | ||
|
||
fn main() -> Result<()> { | ||
let command = Cli::from_args(); | ||
|
||
setup(&command)?; | ||
|
||
let mut options = heed::EnvOpenOptions::new(); | ||
options.map_size(command.index_size.get_bytes() as usize); | ||
let index = milli::Index::new(options, command.index_path)?; | ||
|
||
match command.subcommand { | ||
Command::DocumentAddition(addition) => addition.perform(index)?, | ||
Command::Search(search) => search.perform(index)?, | ||
Command::SettingsUpdate(update) => update.perform(index)?, | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
#[derive(Debug)] | ||
enum DocumentAdditionFormat { | ||
Csv, | ||
Json, | ||
Jsonl, | ||
} | ||
|
||
impl FromStr for DocumentAdditionFormat { | ||
type Err = eyre::Error; | ||
|
||
fn from_str(s: &str) -> Result<Self, Self::Err> { | ||
match s { | ||
"csv" => Ok(Self::Csv), | ||
"jsonl" => Ok(Self::Jsonl), | ||
"json" => Ok(Self::Json), | ||
other => eyre::bail!("invalid format: {}", other), | ||
} | ||
} | ||
} | ||
|
||
#[derive(Debug, StructOpt)] | ||
struct DocumentAddition { | ||
#[structopt(short, long, default_value = "json", possible_values = &["csv", "jsonl", "json"])] | ||
format: DocumentAdditionFormat, | ||
/// Path to the update file, if not present, will read from stdin. | ||
#[structopt(short, long)] | ||
path: Option<PathBuf>, | ||
/// Whether to generate missing document ids. | ||
#[structopt(short, long)] | ||
autogen_docids: bool, | ||
/// Whether to update or replace the documents if they already exist. | ||
#[structopt(short, long)] | ||
update_documents: bool, | ||
} | ||
|
||
impl DocumentAddition { | ||
fn perform(&self, index: milli::Index) -> Result<()> { | ||
let reader: Box<dyn Read> = match self.path { | ||
Some(ref path) => { | ||
let file = File::open(path)?; | ||
Box::new(file) | ||
} | ||
None => Box::new(stdin()), | ||
}; | ||
|
||
println!("parsing documents..."); | ||
|
||
let documents = match self.format { | ||
DocumentAdditionFormat::Csv => documents_from_csv(reader)?, | ||
DocumentAdditionFormat::Json => documents_from_json(reader)?, | ||
DocumentAdditionFormat::Jsonl => documents_from_jsonl(reader)?, | ||
}; | ||
|
||
let reader = milli::documents::DocumentBatchReader::from_reader(Cursor::new(documents))?; | ||
|
||
println!("Adding {} documents to the index.", reader.len()); | ||
|
||
let mut txn = index.env.write_txn()?; | ||
let mut addition = milli::update::IndexDocuments::new(&mut txn, &index, 0); | ||
|
||
if self.update_documents { | ||
addition.index_documents_method(milli::update::IndexDocumentsMethod::UpdateDocuments); | ||
} | ||
|
||
addition.log_every_n(100); | ||
|
||
if self.autogen_docids { | ||
addition.enable_autogenerate_docids() | ||
} | ||
|
||
let mut bars = Vec::new(); | ||
let progesses = MultiProgress::new(); | ||
for _ in 0..4 { | ||
let bar = ProgressBar::hidden(); | ||
let bar = progesses.add(bar); | ||
bars.push(bar); | ||
} | ||
|
||
std::thread::spawn(move || { | ||
progesses.join().unwrap(); | ||
}); | ||
|
||
let result = addition.execute(reader, |step, _| indexing_callback(step, &bars))?; | ||
|
||
txn.commit()?; | ||
|
||
println!("{:?}", result); | ||
Ok(()) | ||
} | ||
} | ||
|
||
fn indexing_callback(step: milli::update::UpdateIndexingStep, bars: &[ProgressBar]) { | ||
let step_index = step.step(); | ||
let bar = &bars[step_index]; | ||
if step_index > 0 { | ||
let prev = &bars[step_index - 1]; | ||
if !prev.is_finished() { | ||
prev.disable_steady_tick(); | ||
prev.finish_at_current_pos(); | ||
} | ||
} | ||
|
||
let style = ProgressStyle::default_bar() | ||
.template("[eta: {eta_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}") | ||
.progress_chars("##-"); | ||
|
||
match step { | ||
RemapDocumentAddition { documents_seen } => { | ||
bar.set_style(ProgressStyle::default_spinner()); | ||
bar.set_message(format!("remaped {} documents so far.", documents_seen)); | ||
} | ||
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => { | ||
bar.set_style(style); | ||
bar.set_length(total_documents as u64); | ||
bar.set_message("Merging documents..."); | ||
bar.set_position(documents_seen as u64); | ||
} | ||
IndexDocuments { documents_seen, total_documents } => { | ||
bar.set_style(style); | ||
bar.set_length(total_documents as u64); | ||
bar.set_message("Indexing documents..."); | ||
bar.set_position(documents_seen as u64); | ||
} | ||
MergeDataIntoFinalDatabase { databases_seen, total_databases } => { | ||
bar.set_style(style); | ||
bar.set_length(total_databases as u64); | ||
bar.set_message("Merging databases..."); | ||
bar.set_position(databases_seen as u64); | ||
} | ||
} | ||
bar.enable_steady_tick(200); | ||
} | ||
|
||
fn documents_from_jsonl(reader: impl Read) -> Result<Vec<u8>> { | ||
let mut writer = Cursor::new(Vec::new()); | ||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?; | ||
|
||
let values = serde_json::Deserializer::from_reader(reader) | ||
.into_iter::<serde_json::Map<String, serde_json::Value>>(); | ||
for document in values { | ||
let document = document?; | ||
documents.add_documents(document)?; | ||
} | ||
documents.finish()?; | ||
|
||
Ok(writer.into_inner()) | ||
} | ||
|
||
fn documents_from_json(reader: impl Read) -> Result<Vec<u8>> { | ||
let mut writer = Cursor::new(Vec::new()); | ||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?; | ||
|
||
let json: serde_json::Value = serde_json::from_reader(reader)?; | ||
documents.add_documents(json)?; | ||
documents.finish()?; | ||
|
||
Ok(writer.into_inner()) | ||
} | ||
|
||
fn documents_from_csv(reader: impl Read) -> Result<Vec<u8>> { | ||
let mut writer = Cursor::new(Vec::new()); | ||
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?; | ||
|
||
let mut records = csv::Reader::from_reader(reader); | ||
let iter = records.deserialize::<Map<String, Value>>(); | ||
|
||
for doc in iter { | ||
let doc = doc?; | ||
documents.add_documents(doc)?; | ||
} | ||
|
||
documents.finish()?; | ||
|
||
Ok(writer.into_inner()) | ||
} | ||
|
||
#[derive(Debug, StructOpt)] | ||
struct Search { | ||
query: Option<String>, | ||
#[structopt(short, long)] | ||
filter: Option<String>, | ||
#[structopt(short, long)] | ||
offset: Option<usize>, | ||
#[structopt(short, long)] | ||
limit: Option<usize>, | ||
} | ||
|
||
impl Search { | ||
fn perform(&self, index: milli::Index) -> Result<()> { | ||
let txn = index.env.read_txn()?; | ||
let mut search = index.search(&txn); | ||
|
||
if let Some(ref query) = self.query { | ||
search.query(query); | ||
} | ||
|
||
if let Some(ref filter) = self.filter { | ||
let condition = milli::FilterCondition::from_str(&txn, &index, filter)?; | ||
search.filter(condition); | ||
} | ||
|
||
if let Some(offset) = self.offset { | ||
search.offset(offset); | ||
} | ||
|
||
if let Some(limit) = self.limit { | ||
search.limit(limit); | ||
} | ||
|
||
let result = search.execute()?; | ||
|
||
let fields_ids_map = index.fields_ids_map(&txn)?; | ||
let displayed_fields = | ||
index.displayed_fields_ids(&txn)?.unwrap_or_else(|| fields_ids_map.ids().collect()); | ||
let documents = index.documents(&txn, result.documents_ids)?; | ||
let mut jsons = Vec::new(); | ||
for (_, obkv) in documents { | ||
let json = milli::obkv_to_json(&displayed_fields, &fields_ids_map, obkv)?; | ||
jsons.push(json); | ||
} | ||
|
||
let hits = serde_json::to_string_pretty(&jsons)?; | ||
|
||
println!("{}", hits); | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
#[derive(Debug, StructOpt)] | ||
struct SettingsUpdate { | ||
#[structopt(short, long)] | ||
filterable_attributes: Option<Vec<String>>, | ||
} | ||
|
||
impl SettingsUpdate { | ||
fn perform(&self, index: milli::Index) -> Result<()> { | ||
let mut txn = index.env.write_txn()?; | ||
|
||
let mut update = milli::update::Settings::new(&mut txn, &index, 0); | ||
update.log_every_n(100); | ||
|
||
if let Some(ref filterable_attributes) = self.filterable_attributes { | ||
if !filterable_attributes.is_empty() { | ||
update.set_filterable_fields(filterable_attributes.iter().cloned().collect()); | ||
} else { | ||
update.reset_filterable_fields(); | ||
} | ||
} | ||
|
||
let mut bars = Vec::new(); | ||
let progesses = MultiProgress::new(); | ||
for _ in 0..4 { | ||
let bar = ProgressBar::hidden(); | ||
let bar = progesses.add(bar); | ||
bars.push(bar); | ||
} | ||
|
||
std::thread::spawn(move || { | ||
progesses.join().unwrap(); | ||
}); | ||
|
||
update.execute(|step, _| indexing_callback(step, &bars))?; | ||
|
||
txn.commit()?; | ||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.