Skip to content

Commit

Permalink
Remove unnecessary reader seeks
Browse files Browse the repository at this point in the history
  • Loading branch information
YS-L committed Apr 28, 2024
1 parent 8ddc59c commit 56a71af
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 8 deletions.
36 changes: 29 additions & 7 deletions src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::cmp::max;
use std::fs::File;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time;

fn string_record_to_vec(record: &csv::StringRecord) -> Vec<String> {
let mut string_vec = Vec::new();
Expand Down Expand Up @@ -174,7 +175,11 @@ impl CsvLensReader {
let pos = Position::new();
self.reader.seek(pos)?;

let tic = time::Instant::now();
let pos_table = self.get_pos_table();
stats.pos_table_elapsed = Some(tic.elapsed());
stats.pos_table_entry = pos_table.len();

let mut pos_iter = pos_table.iter();
let mut indices_iter = indices.iter();

Expand All @@ -189,15 +194,19 @@ impl CsvLensReader {
}
// seek as close to the next wanted record index as possible
let index = next_wanted.unwrap();
let mut seek_pos: Option<Position> = None;
while let Some(pos) = next_pos {
if self.config.position_to_record_index(pos.record()) <= index.record_index {
self.reader.seek(pos.clone())?;
stats.log_seek();
seek_pos.replace(pos.clone());
} else {
break;
}
next_pos = pos_iter.next();
}
if let Some(pos) = seek_pos {
self.reader.seek(pos)?;
stats.log_seek();
}

// note that records() excludes header by default, but here the first entry is header
// because of the seek() above.
Expand Down Expand Up @@ -289,13 +298,17 @@ impl CsvLensReader {
pub struct GetRowsStats {
pub num_seek: u64,
pub num_parsed_record: u64,
pub pos_table_elapsed: Option<time::Duration>,
pub pos_table_entry: usize,
}

impl GetRowsStats {
fn new() -> GetRowsStats {
GetRowsStats {
num_seek: 0,
num_parsed_record: 0,
pos_table_elapsed: None,
pos_table_entry: 0,
}
}

Expand Down Expand Up @@ -467,7 +480,7 @@ mod tests {
let mut r = CsvLensReader::new(config).unwrap();
r.wait_internal();
let indices = vec![1, 3, 5, 1234, 2345, 3456, 4999];
let (rows, stats) = r.get_rows_impl(&indices).unwrap();
let (rows, mut stats) = r.get_rows_impl(&indices).unwrap();
let expected = vec![
Row::new(2, vec!["A2", "B2"]),
Row::new(4, vec!["A4", "B4"]),
Expand All @@ -478,9 +491,12 @@ mod tests {
Row::new(5000, vec!["A5000", "B5000"]),
];
assert_eq!(rows, expected);
stats.pos_table_elapsed.take();
let expected = GetRowsStats {
num_seek: 115,
num_seek: 4,
num_parsed_record: 218,
pos_table_elapsed: None,
pos_table_entry: 115,
};
assert_eq!(stats, expected);
}
Expand All @@ -491,12 +507,15 @@ mod tests {
let mut r = CsvLensReader::new(config).unwrap();
r.wait_internal();
let indices = vec![1234];
let (rows, stats) = r.get_rows_impl(&indices).unwrap();
let (rows, mut stats) = r.get_rows_impl(&indices).unwrap();
let expected = vec![Row::new(1235, vec!["A1235", "B1235"])];
assert_eq!(rows, expected);
stats.pos_table_elapsed.take();
let expected = GetRowsStats {
num_seek: 25,
num_seek: 1,
num_parsed_record: 8,
pos_table_elapsed: None,
pos_table_entry: 115,
};
assert_eq!(stats, expected);
}
Expand All @@ -507,12 +526,15 @@ mod tests {
let mut r = CsvLensReader::new(config).unwrap();
r.wait_internal();
let indices = vec![2];
let (rows, stats) = r.get_rows_impl(&indices).unwrap();
let (rows, mut stats) = r.get_rows_impl(&indices).unwrap();
let expected = vec![Row::new(3, vec!["A3", "B3"])];
assert_eq!(rows, expected);
stats.pos_table_elapsed.take();
let expected = GetRowsStats {
num_seek: 0,
num_parsed_record: 4, // 3 + 1 (including header)
pos_table_elapsed: None,
pos_table_entry: 115,
};
assert_eq!(stats, expected);
}
Expand Down
9 changes: 8 additions & 1 deletion src/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,8 +990,15 @@ impl DebugStats {
let mut line = "[".to_string();
if let Some(stats) = &self.rows_view_stats {
line += format!(
"rows:{:.3}ms seek:{} parse:{}",
"rows:{:.3}ms pos:{}us npos:{} seek:{} parse:{}",
stats.elapsed.as_micros() as f64 / 1000.0,
stats
.reader_stats
.pos_table_elapsed
.as_ref()
.map(|e| e.as_micros())
.unwrap_or(0),
stats.reader_stats.pos_table_entry,
stats.reader_stats.num_seek,
stats.reader_stats.num_parsed_record
)
Expand Down

0 comments on commit 56a71af

Please sign in to comment.