Skip to content

Commit

Permalink
feat: avoid prefetching all sst streams at once (#1069)
Browse files Browse the repository at this point in the history
## Rationale
Close #959

The current procedure to fetch sst data is started immediately, and it
may lead to high memory consumption when massive concurrent queries
reaches.

## Detailed Changes
Introduce a prefetchable stream to replace the normal stream, with which
a way to trigger the fetching data is provided and when to trigger it
can also be determined by the caller.

## Test Plan
Test locally with massive concurrent queries. The memory increment is much slower after this patch.
  • Loading branch information
ShiKaiWi authored Jul 18, 2023
1 parent dcebd17 commit a26988b
Show file tree
Hide file tree
Showing 22 changed files with 348 additions and 78 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl Instance {
let scan_options_for_compaction = ScanOptions {
background_read_parallelism: 1,
max_record_batches_in_flight: MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ,
num_streams_to_prefetch: ctx.config.num_streams_to_prefetch,
};
let compaction_runtime = ctx.runtimes.compact_runtime.clone();
let compaction_scheduler = Arc::new(SchedulerImpl::new(
Expand All @@ -101,6 +102,7 @@ impl Instance {
let scan_options = ScanOptions {
background_read_parallelism: ctx.config.sst_background_read_parallelism,
max_record_batches_in_flight: ctx.config.scan_max_record_batches_in_flight,
num_streams_to_prefetch: ctx.config.num_streams_to_prefetch,
};

let iter_options = ctx
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl Instance {
request_id: request.request_id,
metrics_collector: Some(metrics_collector),
deadline: request.opts.deadline,
num_streams_to_prefetch: self.scan_options.num_streams_to_prefetch,
space_id: table_data.space_id,
table_id: table_data.id,
projected_schema: projected_schema.clone(),
Expand Down
4 changes: 4 additions & 0 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod instance;
mod manifest;
pub mod memtable;
mod payload;
pub mod prefetchable_stream;
pub mod row_iter;
mod sampler;
pub mod setup;
Expand Down Expand Up @@ -81,6 +82,8 @@ pub struct Config {
pub scan_max_record_batches_in_flight: usize,
/// Sst background reading parallelism
pub sst_background_read_parallelism: usize,
/// Number of streams to prefetch
pub num_streams_to_prefetch: usize,
/// Max buffer size for writing sst
pub write_sst_max_buffer_size: ReadableSize,
/// Max retry limit After flush failed
Expand Down Expand Up @@ -134,6 +137,7 @@ impl Default for Config {
preflush_write_buffer_size_ratio: 0.75,
scan_batch_size: None,
sst_background_read_parallelism: 8,
num_streams_to_prefetch: 2,
scan_max_record_batches_in_flight: 1024,
write_sst_max_buffer_size: ReadableSize::mb(10),
max_retry_flush_limit: 0,
Expand Down
172 changes: 172 additions & 0 deletions analytic_engine/src/prefetchable_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

// A stream can be prefetchable.

use async_stream::stream;
use async_trait::async_trait;
use futures::{Stream, StreamExt};

pub type BoxedStream<T> = Box<dyn Stream<Item = T> + Send + Unpin>;

#[async_trait]
pub trait PrefetchableStream: Send {
type Item;

/// Start the prefetch procedure in background. In most implementation, this
/// method should not block the caller, that is to say, the prefetching
/// procedure should be run in the background.
async fn start_prefetch(&mut self);

/// Fetch the next record batch.
///
/// If None is returned, all the following batches will be None.
async fn fetch_next(&mut self) -> Option<Self::Item>;
}

pub trait PrefetchableStreamExt: PrefetchableStream {
fn into_boxed_stream(mut self) -> BoxedStream<Self::Item>
where
Self: 'static + Sized,
Self::Item: Send,
{
let stream = stream! {
while let Some(v) = self.fetch_next().await {
yield v;
}
};

// FIXME: Will this conversion to a stream introduce overhead?
Box::new(Box::pin(stream))
}

fn filter_map<F, O>(self, f: F) -> FilterMap<Self, F>
where
F: FnMut(Self::Item) -> Option<O>,
Self: Sized,
{
FilterMap { stream: self, f }
}

fn map<F, O>(self, f: F) -> Map<Self, F>
where
F: FnMut(Self::Item) -> O,
Self: Sized,
{
Map { stream: self, f }
}
}

impl<T: ?Sized> PrefetchableStreamExt for T where T: PrefetchableStream {}

#[async_trait]
impl<T> PrefetchableStream for Box<dyn PrefetchableStream<Item = T>> {
type Item = T;

async fn start_prefetch(&mut self) {
(**self).start_prefetch().await;
}

async fn fetch_next(&mut self) -> Option<T> {
(**self).fetch_next().await
}
}

/// The implementation for `filter_map` operator on the PrefetchableStream.
pub struct FilterMap<St, F> {
stream: St,
f: F,
}

#[async_trait]
impl<St, F, O> PrefetchableStream for FilterMap<St, F>
where
St: PrefetchableStream,
F: FnMut(St::Item) -> Option<O> + Send,
O: Send,
{
type Item = O;

async fn start_prefetch(&mut self) {
self.stream.start_prefetch().await;
}

async fn fetch_next(&mut self) -> Option<O> {
loop {
match self.stream.fetch_next().await {
Some(v) => {
let filtered_batch = (self.f)(v);
if filtered_batch.is_some() {
return filtered_batch;
}
// If the filtered batch is none, just continue to fetch and
// filter until the underlying stream is exhausted.
}
None => return None,
}
}
}
}

/// The implementation for `map` operator on the PrefetchableStream.
pub struct Map<St, F> {
stream: St,
f: F,
}

#[async_trait]
impl<St, F, O> PrefetchableStream for Map<St, F>
where
St: PrefetchableStream,
F: FnMut(St::Item) -> O + Send,
O: Send,
{
type Item = O;

async fn start_prefetch(&mut self) {
self.stream.start_prefetch().await;
}

async fn fetch_next(&mut self) -> Option<O> {
self.stream.fetch_next().await.map(|v| (self.f)(v))
}
}

/// A noop prefetcher.
///
/// A wrapper with a underlying stream without prefetch logic.
pub struct NoopPrefetcher<T>(pub BoxedStream<T>);

#[async_trait]
impl<T> PrefetchableStream for NoopPrefetcher<T> {
type Item = T;

async fn start_prefetch(&mut self) {
// It's just a noop operation.
}

async fn fetch_next(&mut self) -> Option<T> {
self.0.next().await
}
}

#[cfg(test)]
mod tests {
use futures::stream;

use super::*;

#[tokio::test]
async fn test_trait_object_prefetchable_stream() {
let numbers = vec![1, 2, 3];
let stream = stream::iter(numbers.clone());
let stream = NoopPrefetcher(Box::new(stream));
let mut stream: Box<dyn PrefetchableStream<Item = i32>> = Box::new(stream);

let mut fetched_numbers = Vec::with_capacity(numbers.len());
while let Some(v) = stream.fetch_next().await {
fetched_numbers.push(v);
}

assert_eq!(numbers, fetched_numbers);
}
}
34 changes: 28 additions & 6 deletions analytic_engine/src/row_iter/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use common_types::{
projected_schema::ProjectedSchema, record_batch::RecordBatchWithKey, request_id::RequestId,
schema::RecordSchemaWithKey,
};
use futures::StreamExt;
use generic_error::GenericError;
use log::debug;
use macros::define_result;
Expand All @@ -20,7 +19,7 @@ use trace_metric::{MetricsCollector, TraceMetricWhenDrop};

use crate::{
row_iter::{
record_batch_stream, record_batch_stream::SequencedRecordBatchStream,
record_batch_stream, record_batch_stream::BoxedPrefetchableRecordBatchStream,
RecordBatchWithKeyIterator,
},
space::SpaceId,
Expand Down Expand Up @@ -61,6 +60,7 @@ pub struct ChainConfig<'a> {
pub projected_schema: ProjectedSchema,
/// Predicate of the query.
pub predicate: PredicateRef,
pub num_streams_to_prefetch: usize,

pub sst_read_options: SstReadOptions,
/// Sst factory
Expand Down Expand Up @@ -172,8 +172,10 @@ impl<'a> Builder<'a> {
request_id: self.config.request_id,
schema: self.config.projected_schema.to_record_schema_with_key(),
streams,
num_streams_to_prefetch: self.config.num_streams_to_prefetch,
ssts: self.ssts,
next_stream_idx: 0,
next_prefetch_stream_idx: 0,
inited_at: None,
created_at: Instant::now(),
metrics: Metrics::new(
Expand Down Expand Up @@ -248,17 +250,19 @@ pub struct ChainIterator {
table_id: TableId,
request_id: RequestId,
schema: RecordSchemaWithKey,
streams: Vec<SequencedRecordBatchStream>,
streams: Vec<BoxedPrefetchableRecordBatchStream>,
num_streams_to_prefetch: usize,
/// ssts are kept here to avoid them from being purged.
#[allow(dead_code)]
ssts: Vec<Vec<FileHandle>>,
/// The range of the index is [0, streams.len()] and the iterator is
/// exhausted if it reaches `streams.len()`.
next_stream_idx: usize,
next_prefetch_stream_idx: usize,

inited_at: Option<Instant>,
created_at: Instant,
// metrics for the iterator.
/// metrics for the iterator.
metrics: Metrics,
}

Expand All @@ -273,6 +277,18 @@ impl ChainIterator {
self.space_id, self.table_id, self.request_id, self.streams.len(), self.schema
);
}

/// Maybe prefetch the necessary stream for future reading.
async fn maybe_prefetch(&mut self) {
while self.next_prefetch_stream_idx < self.next_stream_idx + self.num_streams_to_prefetch
&& self.next_prefetch_stream_idx < self.streams.len()
{
self.streams[self.next_prefetch_stream_idx]
.start_prefetch()
.await;
self.next_prefetch_stream_idx += 1;
}
}
}

impl Drop for ChainIterator {
Expand All @@ -294,11 +310,12 @@ impl RecordBatchWithKeyIterator for ChainIterator {

async fn next_batch(&mut self) -> Result<Option<RecordBatchWithKey>> {
self.init_if_necessary();
self.maybe_prefetch().await;

while self.next_stream_idx < self.streams.len() {
let read_stream = &mut self.streams[self.next_stream_idx];
let sequenced_record_batch = read_stream
.next()
.fetch_next()
.await
.transpose()
.context(PollNextRecordBatch)?;
Expand All @@ -313,7 +330,10 @@ impl RecordBatchWithKeyIterator for ChainIterator {
}
}
// Fetch next stream only if the current sequence_record_batch is None.
None => self.next_stream_idx += 1,
None => {
self.next_stream_idx += 1;
self.maybe_prefetch().await;
}
}
}

Expand Down Expand Up @@ -357,8 +377,10 @@ mod tests {
request_id: RequestId::next_id(),
schema: schema.to_record_schema_with_key(),
streams,
num_streams_to_prefetch: 2,
ssts: Vec::new(),
next_stream_idx: 0,
next_prefetch_stream_idx: 0,
inited_at: None,
created_at: Instant::now(),
metrics: Metrics::new(0, 0, None),
Expand Down
Loading

0 comments on commit a26988b

Please sign in to comment.