Skip to content

Commit

Permalink
Started index refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
tomfran committed Jan 5, 2024
1 parent 58819bb commit fcb5aba
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 194 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ rand = "0.8"
tokenizers = { version = "0.15.0", features = ["http"] }
rust-stemmers = "1.2.0"
rayon = "1.8.0"
indicatif = {version = "0.17.0", features = ["rayon", "improved_unicode"]}
25 changes: 25 additions & 0 deletions src/disk/bits_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ impl BitsReader {
res as u32 - 1
}

pub fn read_vbyte_gamma_gap_vector(&mut self) -> Vec<u32> {
let mut prefix = 0;
(0..self.read_vbyte())
.map(|_| {
let gap = self.read_gamma();
prefix += gap;
prefix
})
.collect()
}

pub fn read_vbyte_gamma_vector(&mut self) -> Vec<u32> {
(0..self.read_vbyte()).map(|_| self.read_gamma()).collect()
}

fn read_internal(&mut self, len: u32) -> u128 {
let mask = (1 << len) - 1;

Expand Down Expand Up @@ -125,12 +140,22 @@ mod test {
w.write_gamma(i);
});

for _ in 0..2 {
w.write_vbyte(3);
(1..4).for_each(|i| {
w.write_gamma(i);
});
}

w.flush();

let mut r = BitsReader::new("data/test/writer_unit.bin");

(1..100).for_each(|i| assert_eq!(i, r.read_vbyte()));
(1..100).for_each(|i| assert_eq!(i, r.read_gamma()));

assert_eq!(r.read_vbyte_gamma_vector(), [1, 2, 3]);
assert_eq!(r.read_vbyte_gamma_gap_vector(), [1, 3, 6]);
}

#[test]
Expand Down
157 changes: 57 additions & 100 deletions src/index/builder.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
use std::{
collections::{BTreeMap, HashMap},
fs,
sync::Mutex,
};

use indicatif::{ParallelProgressIterator, ProgressStyle};
use rayon::prelude::*;
use rust_stemmers::Stemmer;
use std::{collections::BTreeMap, fs, sync::Mutex};
use tokenizers::Tokenizer;

use crate::disk::{bits_writer::BitsWriter, terms_writer::TermsWriter};

use super::{
text_utils,
{
DOCUMENT_LENGHTS_EXTENSION, OFFSETS_EXTENSION, POSTINGS_EXTENSION,
VOCABULARY_ALPHA_EXTENSION, VOCABULARY_LENGHTS_EXTENSION,
},
documents::write_doc_lentghts,
postings::{write_postings, PostingEntry, PostingList},
text,
vocabulary::write_vocabulary,
InMemoryIndex,
};

struct InMemoryIndex {
term_index_map: BTreeMap<String, usize>,
postings: Vec<BTreeMap<u32, u32>>,
document_lengths: Vec<u32>,
}
const PROGRESS_STYLE: &str =
" Documents per second: {per_sec:<3}\n\n [{elapsed_precise}] [{bar:50}] {pos}/{len} ({eta})";
const PROGRESS_CHARS: &str = "=> ";

pub fn build_index(input_dir: &str, output_path: &str, tokenizer: &Tokenizer, stemmer: &Stemmer) {
let index = build_in_memory(input_dir, tokenizer, stemmer);
let index: InMemoryIndex = build_in_memory(input_dir, tokenizer, stemmer);
write_postings(&index, output_path);
write_vocabulary(&index.term_index_map, output_path);
write_doc_lentghts(&index.document_lengths, output_path);
Expand All @@ -34,107 +36,62 @@ fn build_in_memory(input_dir: &str, tokenizer: &Tokenizer, stemmer: &Stemmer) ->

let doc_id_mutex = Mutex::new(0);
let term_index_map = Mutex::new(BTreeMap::new());
let postings = Mutex::new(Vec::new());
let document_lengths = Mutex::new(Vec::new());

documents.into_par_iter().for_each(|d| {
let file_content = fs::read_to_string(d.path()).expect("error while reading file");
let tokens = text_utils::tokenize_and_stem(tokenizer, stemmer, &file_content);
let postings: Mutex<Vec<PostingList>> = Mutex::new(Vec::new());
let term_doc_map: Mutex<Vec<HashMap<u32, usize>>> = Mutex::new(Vec::new());

let mut doc_id = doc_id_mutex.lock().unwrap();
let document_lengths = Mutex::new(Vec::new());

if *doc_id % 5000 == 0 && *doc_id > 0 {
println!("Document num: {}", *doc_id);
}
documents
.into_par_iter()
.progress_with_style(
ProgressStyle::with_template(PROGRESS_STYLE)
.unwrap()
.progress_chars(PROGRESS_CHARS),
)
.for_each(|d| {
let file_content = fs::read_to_string(d.path()).expect("error while reading file");
let tokens = text::tokenize_and_stem(tokenizer, stemmer, &file_content);

document_lengths.lock().unwrap().push(tokens.len() as u32);
let mut doc_id = doc_id_mutex.lock().unwrap();

let mut l_term_index_map = term_index_map.lock().unwrap();
let mut l_postings = postings.lock().unwrap();
document_lengths.lock().unwrap().push(tokens.len() as u32);

for t in tokens.iter() {
let value = l_term_index_map.get(t);
let mut l_term_index_map = term_index_map.lock().unwrap();
let mut l_postings = postings.lock().unwrap();
let mut l_term_doc_map = term_doc_map.lock().unwrap();

let postings_counter = match value {
Some(idx) => &mut l_postings[*idx],
None => {
for (word_pos, t) in tokens.iter().enumerate() {
if !l_term_index_map.contains_key(t) {
let idx = l_term_index_map.len();
l_term_index_map.insert(t.clone(), idx);
l_postings.push(BTreeMap::new());
&mut l_postings[idx]
l_postings.push(PostingList::default());
l_term_doc_map.push(HashMap::new());
}
};
let term_index = *l_term_index_map.get(t).unwrap();

postings_counter
.entry(*doc_id)
.and_modify(|count| *count += 1)
.or_insert(1);
}
*doc_id += 1;
});
let postings_list = &mut l_postings[term_index];
postings_list.collection_frequency += 1;

if !l_term_doc_map[term_index].contains_key(&doc_id) {
let idx = postings_list.documents.len();
l_term_doc_map[term_index].insert(*doc_id, idx);
postings_list.documents.push(PostingEntry::default());
}
let posting_entry_index = *l_term_doc_map[term_index].get(&doc_id).unwrap();

let posting_entry = &mut postings_list.documents[posting_entry_index];

posting_entry.document_frequency += 1;
posting_entry.document_id = *doc_id;
posting_entry.positions.push(word_pos as u32);
}
*doc_id += 1;
});

InMemoryIndex {
term_index_map: term_index_map.into_inner().unwrap(),
postings: postings.into_inner().unwrap(),
document_lengths: document_lengths.into_inner().unwrap(),
}
}

fn write_postings(index: &InMemoryIndex, output_path: &str) {
let postings_path = output_path.to_string() + POSTINGS_EXTENSION;
let mut postings_writer = BitsWriter::new(&postings_path);

let offsets_path = output_path.to_string() + OFFSETS_EXTENSION;
let mut offsets_writer = BitsWriter::new(&offsets_path);

let mut offset: u64 = 0;
let mut prev_offset = 0;

offsets_writer.write_vbyte(index.term_index_map.len() as u32);

for (_, idx) in index.term_index_map.iter() {
offsets_writer.write_gamma(offset as u32 - prev_offset);
prev_offset = offset as u32;

let postings: &BTreeMap<u32, u32> = &index.postings[*idx];
offset += postings_writer.write_vbyte(postings.len() as u32);

let mut prev = 0;
for (doc_id, frequency) in postings.iter() {
offset += postings_writer.write_gamma(doc_id - prev);
offset += postings_writer.write_gamma(*frequency);
prev = *doc_id;
}
}

postings_writer.flush();
offsets_writer.flush();
}

fn write_vocabulary(vocab: &BTreeMap<String, usize>, output_path: &str) {
let terms_path = output_path.to_string() + VOCABULARY_ALPHA_EXTENSION;
let mut terms_writer = TermsWriter::new(&terms_path);

let lenghts_path = output_path.to_string() + VOCABULARY_LENGHTS_EXTENSION;
let mut lenghts_writer = BitsWriter::new(&lenghts_path);

for term in vocab.keys() {
lenghts_writer.write_gamma(term.len() as u32);
terms_writer.write_term(term);
}

lenghts_writer.flush();
terms_writer.flush();
}

fn write_doc_lentghts(document_lenghts: &Vec<u32>, output_path: &str) {
let doc_path = output_path.to_string() + DOCUMENT_LENGHTS_EXTENSION;
let mut doc_writer = BitsWriter::new(&doc_path);

doc_writer.write_vbyte(document_lenghts.len() as u32);
document_lenghts.iter().for_each(|l| {
doc_writer.write_gamma(*l);
});

doc_writer.flush();
}
20 changes: 20 additions & 0 deletions src/index/documents.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use crate::disk::{bits_reader::BitsReader, bits_writer::BitsWriter};

use super::DOCUMENT_LENGHTS_EXTENSION;

pub fn write_doc_lentghts(document_lenghts: &Vec<u32>, output_path: &str) {
let doc_path = output_path.to_string() + DOCUMENT_LENGHTS_EXTENSION;
let mut doc_writer = BitsWriter::new(&doc_path);

doc_writer.write_vbyte(document_lenghts.len() as u32);
document_lenghts.iter().for_each(|l| {
doc_writer.write_gamma(*l);
});

doc_writer.flush();
}

pub fn load_document_lenghts(input_path: &str) -> Vec<u32> {
let mut reader = BitsReader::new(&(input_path.to_string() + DOCUMENT_LENGHTS_EXTENSION));
reader.read_vbyte_gamma_vector()
}
68 changes: 23 additions & 45 deletions src/index/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod builder;
mod loader;
mod text_utils;
mod documents;
mod postings;
mod text;
mod vocabulary;

use rust_stemmers::Stemmer;
use std::collections::BTreeMap;
Expand All @@ -9,6 +11,8 @@ use tokenizers::Tokenizer;

use crate::disk::bits_reader::BitsReader;

use self::postings::PostingList;

pub const POSTINGS_EXTENSION: &str = ".postings";
pub const OFFSETS_EXTENSION: &str = ".offsets";
pub const DOCUMENT_LENGHTS_EXTENSION: &str = ".doc_lengths";
Expand All @@ -23,69 +27,40 @@ pub struct Index {
stemmer: Stemmer,
}

#[derive(Debug)]
pub struct PostingList {
pub documents: Vec<PostingEntry>,
pub collection_frequency: u32,
}

#[derive(Debug)]
pub struct PostingEntry {
pub document_id: u32,
pub document_frequency: u32,
pub struct InMemoryIndex {
term_index_map: BTreeMap<String, usize>,
postings: Vec<PostingList>,
document_lengths: Vec<u32>,
}

impl Index {
pub fn build_index(input_path: &str, output_path: &str, tokenizer_path: &str) {
let tokenizer = text_utils::load_tokenizer(tokenizer_path, false);
let stemmer = text_utils::load_stemmer();
let tokenizer = text::load_tokenizer(tokenizer_path, false);
let stemmer = text::load_stemmer();
builder::build_index(input_path, output_path, &tokenizer, &stemmer);
}

pub fn load_index(input_path: &str, tokenizer_path: &str) -> Index {
Index {
postings: loader::build_postings_reader(input_path),
term_offset_map: loader::load_terms_to_offsets_map(input_path),
doc_lenghts: loader::load_document_lenghts(input_path),
tokenizer: text_utils::load_tokenizer(tokenizer_path, false),
stemmer: text_utils::load_stemmer(),
postings: postings::build_postings_reader(input_path),
term_offset_map: vocabulary::load_vocabulary(input_path),
doc_lenghts: documents::load_document_lenghts(input_path),
tokenizer: text::load_tokenizer(tokenizer_path, false),
stemmer: text::load_stemmer(),
}
}

pub fn get_num_documents(&self) -> u32 {
self.doc_lenghts.len() as u32
}

pub fn get_term(&mut self, term: &str) -> Option<PostingList> {
pub fn get_term(&mut self, term: &str) -> Option<postings::PostingList> {
let offset = self.term_offset_map.get(term)?;

self.postings.seek(*offset);
let mut document_id = 0;

let documents: Vec<PostingEntry> = (0..self.postings.read_vbyte())
.map(|_| {
let doc_id_delta = self.postings.read_gamma();
let document_frequency = self.postings.read_gamma();

document_id += doc_id_delta;

PostingEntry {
document_id,
document_frequency,
}
})
.collect();

let collection_frequency = documents.len() as u32;

Some(PostingList {
documents,
collection_frequency,
})
Some(postings::load_postings_list(&mut self.postings, *offset))
}

pub fn tokenize_and_stem_query(&self, query: &str) -> Vec<String> {
text_utils::tokenize_and_stem(&self.tokenizer, &self.stemmer, query)
text::tokenize_and_stem(&self.tokenizer, &self.stemmer, query)
}
}

Expand Down Expand Up @@ -131,5 +106,8 @@ mod test {
);

assert_eq!(pl.collection_frequency, 2);

let pl = idx.get_term("world").unwrap();
assert_eq!(pl.documents[0].positions, [1]);
}
}
Loading

0 comments on commit fcb5aba

Please sign in to comment.