Skip to content

Commit

Permalink
feat: support dedup execute physical plan (#1237)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi authored Oct 2, 2023
1 parent c8aecbd commit 81e189f
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 52 deletions.
2 changes: 1 addition & 1 deletion query_engine/src/datafusion_impl/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Preprocessor {
.await
.box_err()
.with_context(|| ExecutorWithCause {
msg: format!("failed to preprocess remote plan"),
msg: Some("failed to preprocess remote plan".to_string()),
})
}

Expand Down
1 change: 1 addition & 0 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ impl Builder {
v.enable.then(|| QueryDedup {
config: v.clone(),
request_notifiers: Arc::new(RequestNotifiers::default()),
physical_plan_notifiers: Arc::new(RequestNotifiers::default()),
})
})
.unwrap_or_default();
Expand Down
171 changes: 120 additions & 51 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use ceresdbproto::{
storage::{arrow_payload, ArrowPayload},
};
use common_types::{record_batch::RecordBatch, request_id::RequestId};
use futures::stream::{self, BoxStream, FuturesUnordered, StreamExt};
use futures::{
stream::{self, BoxStream, FuturesUnordered, StreamExt},
Future,
};
use generic_error::BoxError;
use log::{error, info};
use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult};
Expand Down Expand Up @@ -97,6 +100,14 @@ impl StreamReadReqKey {
pub type StreamReadRequestNotifiers =
Arc<RequestNotifiers<StreamReadReqKey, mpsc::Sender<Result<RecordBatch>>>>;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct PhysicalPlanKey {
encoded_plan: Vec<u8>,
}

pub type PhysicalPlanNotifiers =
Arc<RequestNotifiers<PhysicalPlanKey, mpsc::Sender<Result<RecordBatch>>>>;

/// Stream metric
trait MetricCollector: 'static + Send + Unpin {
fn collect(self);
Expand Down Expand Up @@ -221,6 +232,7 @@ macro_rules! record_stream_to_response_stream {
pub struct QueryDedup {
pub config: QueryDedupConfig,
pub request_notifiers: StreamReadRequestNotifiers,
pub physical_plan_notifiers: PhysicalPlanNotifiers,
}

#[derive(Clone)]
Expand Down Expand Up @@ -286,7 +298,6 @@ impl RemoteEngineServiceImpl {
request: Request<ReadRequest>,
) -> Result<StreamWithMetric<StreamReadMetricCollector>> {
let metric = StreamReadMetricCollector(Instant::now());
let (tx, rx) = mpsc::channel(query_dedup.config.notify_queue_cap);

let request = request.into_inner();
let table_engine::remote::model::ReadRequest {
Expand All @@ -303,14 +314,25 @@ impl RemoteEngineServiceImpl {
read_request.projected_schema.projection(),
);

match query_dedup
.request_notifiers
.insert_notifier(request_key.clone(), tx)
{
let QueryDedup {
config,
request_notifiers,
..
} = query_dedup;

let (tx, rx) = mpsc::channel(config.notify_queue_cap);
match request_notifiers.insert_notifier(request_key.clone(), tx) {
// The first request, need to handle it, and then notify the other requests.
RequestResult::First => {
self.read_and_send_dedupped_resps(request, request_key, query_dedup)
.await?;
let ctx = self.handler_ctx();
let query = async move { handle_stream_read(ctx, request).await };
self.read_and_send_dedupped_resps(
request_key,
query,
request_notifiers.clone(),
config.notify_timeout.0,
)
.await?;
}
// The request is waiting for the result of first request.
RequestResult::Wait => {
Expand All @@ -325,22 +347,22 @@ impl RemoteEngineServiceImpl {
))
}

async fn read_and_send_dedupped_resps(
async fn read_and_send_dedupped_resps<K, F>(
&self,
request: ReadRequest,
request_key: StreamReadReqKey,
query_dedup: QueryDedup,
) -> Result<()> {
let ctx = self.handler_ctx();

request_key: K,
query: F,
notifiers: Arc<RequestNotifiers<K, mpsc::Sender<Result<RecordBatch>>>>,
notify_timeout: Duration,
) -> Result<()>
where
K: Hash + PartialEq + Eq,
F: Future<Output = Result<PartitionedStreams>> + Send + 'static,
{
// This is used to remove key when future is cancelled.
let mut guard = ExecutionGuard::new(|| {
query_dedup.request_notifiers.take_notifiers(&request_key);
notifiers.take_notifiers(&request_key);
});
let handle = self
.runtimes
.read_runtime
.spawn(async move { handle_stream_read(ctx, request).await });
let handle = self.runtimes.read_runtime.spawn(query);
let streams = handle.await.box_err().context(ErrWithCause {
code: StatusCode::Internal,
msg: "fail to join task",
Expand Down Expand Up @@ -379,15 +401,11 @@ impl RemoteEngineServiceImpl {

// We should set cancel to guard, otherwise the key will be removed twice.
guard.cancel();
let notifiers = query_dedup
.request_notifiers
.take_notifiers(&request_key)
.unwrap();
let notifiers = notifiers.take_notifiers(&request_key).unwrap();

// Do send in background to avoid blocking the rpc procedure.
let timeout = query_dedup.config.notify_timeout.0;
self.runtimes.read_runtime.spawn(async move {
Self::send_dedupped_resps(resps, notifiers, timeout).await;
Self::send_dedupped_resps(resps, notifiers, notify_timeout).await;
});

Ok(())
Expand Down Expand Up @@ -564,11 +582,12 @@ impl RemoteEngineServiceImpl {
let metric = ExecutePlanMetricCollect(Instant::now());
let request = request.into_inner();
let query_engine = self.instance.query_engine.clone();
let (ctx, encoded_plan) = extract_plan_from_req(request)?;

let stream = self
.runtimes
.read_runtime
.spawn(async move { handle_execute_plan(request, query_engine).await })
.spawn(async move { handle_execute_plan(ctx, encoded_plan, query_engine).await })
.await
.box_err()
.with_context(|| ErrWithCause {
Expand All @@ -585,6 +604,56 @@ impl RemoteEngineServiceImpl {
Ok(StreamWithMetric::new(Box::pin(stream), metric))
}

async fn dedup_execute_physical_plan_internal(
&self,
query_dedup: QueryDedup,
request: Request<ExecutePlanRequest>,
) -> Result<StreamWithMetric<ExecutePlanMetricCollect>> {
let metric = ExecutePlanMetricCollect(Instant::now());
let request = request.into_inner();

let query_engine = self.instance.query_engine.clone();
let (ctx, encoded_plan) = extract_plan_from_req(request)?;
let key = PhysicalPlanKey {
encoded_plan: encoded_plan.clone(),
};

let QueryDedup {
config,
physical_plan_notifiers,
..
} = query_dedup;

let (tx, rx) = mpsc::channel(config.notify_queue_cap);
match physical_plan_notifiers.insert_notifier(key.clone(), tx) {
// The first request, need to handle it, and then notify the other requests.
RequestResult::First => {
let query = async move {
handle_execute_plan(ctx, encoded_plan, query_engine)
.await
.map(PartitionedStreams::one_stream)
};
self.read_and_send_dedupped_resps(
key,
query,
physical_plan_notifiers,
config.notify_timeout.0,
)
.await?;
}
// The request is waiting for the result of first request.
RequestResult::Wait => {
// TODO: add metrics to collect the time cost of waited stream
// read.
}
}

Ok(StreamWithMetric::new(
Box::pin(ReceiverStream::new(rx)),
metric,
))
}

fn handler_ctx(&self) -> HandlerContext {
HandlerContext {
catalog_manager: self.instance.catalog_manager.clone(),
Expand Down Expand Up @@ -652,17 +721,13 @@ impl RemoteEngineService for RemoteEngineServiceImpl {
&self,
request: Request<ExecutePlanRequest>,
) -> std::result::Result<Response<Self::ExecutePhysicalPlanStream>, Status> {
let record_stream_result =
self.execute_physical_plan_internal(request)
.await
.map(|stream| {
stream.map(|batch_result| {
batch_result.box_err().with_context(|| ErrWithCause {
code: StatusCode::Internal,
msg: "failed to poll record batch",
})
})
});
let record_stream_result = match self.query_dedup.clone() {
Some(query_dedup) => {
self.dedup_execute_physical_plan_internal(query_dedup, request)
.await
}
None => self.execute_physical_plan_internal(request).await,
};

record_stream_to_response_stream!(record_stream_result, ExecutePhysicalPlanStream)
}
Expand Down Expand Up @@ -870,22 +935,33 @@ async fn handle_get_table_info(
})
}

async fn handle_execute_plan(
request: ExecutePlanRequest,
query_engine: QueryEngineRef,
) -> Result<SendableRecordBatchStream> {
fn extract_plan_from_req(request: ExecutePlanRequest) -> Result<(ExecContext, Vec<u8>)> {
// Build execution context.
let ctx_in_req = request.context.with_context(|| ErrNoCause {
code: StatusCode::Internal,
msg: "execution context not found in physical plan request",
})?;
let typed_plan_in_req = request.physical_plan.with_context(|| ErrNoCause {
code: StatusCode::Internal,
msg: "plan not found in physical plan request",
})?;
// FIXME: return the type from query engine.
let valid_plan = check_and_extract_plan(typed_plan_in_req, QueryEngineType::Datafusion)?;

Ok((ctx_in_req, valid_plan))
}

async fn handle_execute_plan(
ctx: ExecContext,
encoded_plan: Vec<u8>,
query_engine: QueryEngineRef,
) -> Result<SendableRecordBatchStream> {
let ExecContext {
request_id,
default_catalog,
default_schema,
timeout_ms,
} = ctx_in_req;
} = ctx;

let request_id = RequestId::from(request_id);
let deadline = if timeout_ms >= 0 {
Expand All @@ -901,16 +977,9 @@ async fn handle_execute_plan(
default_schema,
};

// Build physical plan.
let typed_plan_in_req = request.physical_plan.with_context(|| ErrNoCause {
code: StatusCode::Internal,
msg: "plan not found in physical plan request",
})?;
// FIXME: return the type from query engine.
let valid_plan = check_and_extract_plan(typed_plan_in_req, QueryEngineType::Datafusion)?;
// TODO: Build remote plan in physical planner.
let physical_plan = Box::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
valid_plan,
encoded_plan,
)));

// Execute plan.
Expand Down

0 comments on commit 81e189f

Please sign in to comment.