Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove Item from merger's Node trait #3371

Merged
merged 8 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 168 additions & 55 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;

use bytes::Bytes;
use datatypes::arrow;
use datatypes::arrow::array::{RecordBatch, UInt16Array, UInt32Array};
use datatypes::arrow::array::{ArrayRef, RecordBatch, UInt16Array, UInt32Array, UInt64Array};
use datatypes::arrow::datatypes::{Field, Schema, SchemaRef};
use datatypes::data_type::DataType;
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, Vector, VectorRef};
Expand All @@ -42,36 +42,51 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
use crate::error;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::merger::{DataNode, DataSource, Merger};
use crate::memtable::merge_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
use crate::memtable::merge_tree::PkIndex;

const PK_INDEX_COLUMN_NAME: &str = "__pk_index";

/// Initial capacity for the data buffer.
pub(crate) const DATA_INIT_CAP: usize = 8;

/// Data part batches returns by `DataParts::read`.
#[derive(Debug, Clone)]
pub struct DataBatch {
/// Range of a data batch.
#[derive(Debug, Clone, Copy)]
pub(crate) struct DataBatchRange {
/// Primary key index of this batch.
pub(crate) pk_index: PkIndex,
/// Start of current primary key inside record batch.
pub(crate) start: usize,
/// End of current primary key inside record batch.
pub(crate) end: usize,
}

impl DataBatchRange {
pub(crate) fn len(&self) -> usize {
(self.start..self.end).len()
}

pub(crate) fn is_empty(&self) -> bool {
(self.start..self.end).is_empty()
}
}

/// Data part batches returns by `DataParts::read`.
#[derive(Debug, Clone, Copy)]
pub struct DataBatch<'a> {
/// Record batch of data.
pub(crate) rb: RecordBatch,
rb: &'a RecordBatch,
/// Range of current primary key inside record batch
pub(crate) range: Range<usize>,
range: DataBatchRange,
}

impl DataBatch {
impl<'a> DataBatch<'a> {
pub(crate) fn pk_index(&self) -> PkIndex {
self.pk_index
}

pub(crate) fn record_batch(&self) -> &RecordBatch {
&self.rb
self.range.pk_index
}

pub(crate) fn range(&self) -> Range<usize> {
self.range.clone()
pub(crate) fn range(&self) -> DataBatchRange {
self.range
}

pub(crate) fn is_empty(&self) -> bool {
Expand All @@ -81,6 +96,73 @@ impl DataBatch {
pub(crate) fn slice_record_batch(&self) -> RecordBatch {
self.rb.slice(self.range.start, self.range.len())
}

pub(crate) fn first_row(&self) -> (i64, u64) {
let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
let sequence_values = self
.rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
(
ts_values[self.range.start],
sequence_values[self.range.start],
)
}

pub(crate) fn last_row(&self) -> (i64, u64) {
let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
let sequence_values = self
.rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
(
ts_values[self.range.end - 1],
sequence_values[self.range.end - 1],
)
}

pub(crate) fn first_key(&self) -> DataBatchKey {
let pk_index = self.pk_index();
let ts_array = self.rb.column(1);

// maybe safe the result somewhere.
let ts_values = timestamp_array_to_i64_slice(ts_array);
let timestamp = ts_values[self.range.start];
DataBatchKey {
pk_index,
timestamp,
}
}

pub(crate) fn search_key(&self, key: &DataBatchKey) -> Result<usize, usize> {
let DataBatchKey {
pk_index,
timestamp,
} = key;
assert_eq!(*pk_index, self.range.pk_index);
let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
let ts_values = &ts_values[self.range.start..self.range.end];
ts_values.binary_search(timestamp)
}

pub(crate) fn slice(self, offset: usize, length: usize) -> DataBatch<'a> {
let start = self.range.start + offset;
let end = start + length;
DataBatch {
rb: self.rb,
range: DataBatchRange {
pk_index: self.range.pk_index,
start,
end,
},
}
}
}

/// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard.
Expand Down Expand Up @@ -307,60 +389,95 @@ fn data_buffer_to_record_batches(
RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu)
}

pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] {
use datatypes::arrow::array::{
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use datatypes::arrow::datatypes::{DataType, TimeUnit};

match arr.data_type() {
DataType::Timestamp(t, _) => match t {
TimeUnit::Second => arr
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.values(),
TimeUnit::Millisecond => arr
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values(),
TimeUnit::Microsecond => arr
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.values(),
TimeUnit::Nanosecond => arr
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.values(),
},
_ => unreachable!(),
}
}

#[derive(Debug)]
pub(crate) struct DataBufferReader {
batch: RecordBatch,
offset: usize,
current_batch: Option<(PkIndex, Range<usize>)>,
current_range: Option<DataBatchRange>,
}

impl DataBufferReader {
pub(crate) fn new(batch: RecordBatch) -> Result<Self> {
let mut reader = Self {
batch,
offset: 0,
current_batch: None,
current_range: None,
};
reader.next()?; // fill data batch for comparison and merge.
Ok(reader)
}

pub(crate) fn is_valid(&self) -> bool {
self.current_batch.is_some()
self.current_range.is_some()
}

/// Returns current data batch.
/// # Panics
/// If Current reader is exhausted.
pub(crate) fn current_data_batch(&self) -> DataBatch {
let (pk_index, range) = self.current_batch.as_ref().unwrap();
let rb = self.batch.slice(range.start, range.len());
let range = 0..rb.num_rows();
let range = self.current_range.unwrap();
DataBatch {
pk_index: *pk_index,
rb,
rb: &self.batch,
range,
}
}

/// # Panics
/// If Current reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
let (pk_index, _) = self.current_batch.as_ref().unwrap();
*pk_index
self.current_range.as_ref().unwrap().pk_index
}

/// Advances reader to next data batch.
pub(crate) fn next(&mut self) -> Result<()> {
if self.offset >= self.batch.num_rows() {
self.current_batch = None;
self.current_range = None;
return Ok(());
}
let pk_index_array = pk_index_array(&self.batch);
if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) {
self.offset = range.end;
self.current_batch = Some((next_pk, range))
self.current_range = Some(DataBatchRange {
pk_index: next_pk,
start: range.start,
end: range.end,
});
} else {
self.current_batch = None;
self.current_range = None;
}
Ok(())
}
Expand Down Expand Up @@ -579,16 +696,14 @@ impl DataPart {

pub struct DataPartReader {
inner: ParquetRecordBatchReader,
current_range: Range<usize>,
current_pk_index: Option<PkIndex>,
current_batch: Option<RecordBatch>,
current_range: Option<DataBatchRange>,
}

impl Debug for DataPartReader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DataPartReader")
.field("current_range", &self.current_range)
.field("current_pk_index", &self.current_pk_index)
.finish()
}
}
Expand All @@ -603,63 +718,56 @@ impl DataPartReader {
let parquet_reader = builder.build().context(error::ReadDataPartSnafu)?;
let mut reader = Self {
inner: parquet_reader,
current_pk_index: None,
current_range: 0..0,
current_batch: None,
current_range: None,
};
reader.next()?;
Ok(reader)
}

/// Returns false if current reader is exhausted.
pub(crate) fn is_valid(&self) -> bool {
self.current_pk_index.is_some()
self.current_range.is_some()
}

/// Returns current pk index.
///
/// # Panics
/// If reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
self.current_pk_index.expect("DataPartReader is exhausted")
self.current_range.as_ref().unwrap().pk_index
}

/// Returns current data batch of reader.
/// # Panics
/// If reader is exhausted.
pub(crate) fn current_data_batch(&self) -> DataBatch {
let pk_index = self.current_pk_index.unwrap();
let range = self.current_range.clone();
let rb = self
.current_batch
.as_ref()
.unwrap()
.slice(range.start, range.len());

let range = 0..rb.num_rows();
let range = self.current_range.unwrap();
DataBatch {
pk_index,
rb,
rb: self.current_batch.as_ref().unwrap(),
range,
}
}

pub(crate) fn next(&mut self) -> Result<()> {
if let Some((next_pk, range)) = self.search_next_pk_range() {
// first try to search next pk in current record batch.
self.current_pk_index = Some(next_pk);
self.current_range = range;
self.current_range = Some(DataBatchRange {
pk_index: next_pk,
start: range.start,
end: range.end,
});
} else {
// current record batch reaches eof, fetch next record batch from parquet reader.
if let Some(rb) = self.inner.next() {
let rb = rb.context(error::ComputeArrowSnafu)?;
self.current_range = 0..0;
self.current_batch = Some(rb);
self.current_range = None;
return self.next();
} else {
// parquet is also exhausted
self.current_pk_index = None;
self.current_batch = None;
self.current_range = None;
}
}

Expand All @@ -671,7 +779,12 @@ impl DataPartReader {
self.current_batch.as_ref().and_then(|b| {
// safety: PK_INDEX_COLUMN_NAME must present in record batch yielded by data part.
let pk_array = pk_index_array(b);
search_next_pk_range(pk_array, self.current_range.end)
let start = self
.current_range
.as_ref()
.map(|range| range.end)
.unwrap_or(0);
search_next_pk_range(pk_array, start)
})
}
}
Expand Down Expand Up @@ -741,8 +854,9 @@ pub struct DataPartsReader {
}

impl DataPartsReader {
pub(crate) fn current_data_batch(&self) -> &DataBatch {
self.merger.current_item()
pub(crate) fn current_data_batch(&self) -> DataBatch {
let batch = self.merger.current_node().current_data_batch();
batch.slice(0, self.merger.current_rows())
}

pub(crate) fn next(&mut self) -> Result<()> {
Expand All @@ -762,7 +876,6 @@ mod tests {
use parquet::data_type::AsBytes;

use super::*;
use crate::memtable::merge_tree::merger::timestamp_array_to_i64_slice;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};

#[test]
Expand Down Expand Up @@ -1013,7 +1126,7 @@ mod tests {
.zip(sequence.iter())
.map(|(ts, seq)| (*ts, *seq))
.collect::<Vec<_>>();
res.push((batch.pk_index, ts_and_seq));
res.push((batch.pk_index(), ts_and_seq));

reader.next().unwrap();
}
Expand Down
Loading