Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support dedup execute physical plan #1237

Merged
merged 1 commit into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion query_engine/src/datafusion_impl/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Preprocessor {
.await
.box_err()
.with_context(|| ExecutorWithCause {
msg: format!("failed to preprocess remote plan"),
msg: Some("failed to preprocess remote plan".to_string()),
})
}

Expand Down
1 change: 1 addition & 0 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ impl Builder {
v.enable.then(|| QueryDedup {
config: v.clone(),
request_notifiers: Arc::new(RequestNotifiers::default()),
physical_plan_notifiers: Arc::new(RequestNotifiers::default()),
})
})
.unwrap_or_default();
Expand Down
171 changes: 120 additions & 51 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use ceresdbproto::{
storage::{arrow_payload, ArrowPayload},
};
use common_types::{record_batch::RecordBatch, request_id::RequestId};
use futures::stream::{self, BoxStream, FuturesUnordered, StreamExt};
use futures::{
stream::{self, BoxStream, FuturesUnordered, StreamExt},
Future,
};
use generic_error::BoxError;
use log::{error, info};
use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult};
Expand Down Expand Up @@ -97,6 +100,14 @@ impl StreamReadReqKey {
pub type StreamReadRequestNotifiers =
Arc<RequestNotifiers<StreamReadReqKey, mpsc::Sender<Result<RecordBatch>>>>;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct PhysicalPlanKey {
encoded_plan: Vec<u8>,
}

pub type PhysicalPlanNotifiers =
Arc<RequestNotifiers<PhysicalPlanKey, mpsc::Sender<Result<RecordBatch>>>>;

/// Stream metric
trait MetricCollector: 'static + Send + Unpin {
fn collect(self);
Expand Down Expand Up @@ -221,6 +232,7 @@ macro_rules! record_stream_to_response_stream {
pub struct QueryDedup {
pub config: QueryDedupConfig,
pub request_notifiers: StreamReadRequestNotifiers,
pub physical_plan_notifiers: PhysicalPlanNotifiers,
}

#[derive(Clone)]
Expand Down Expand Up @@ -286,7 +298,6 @@ impl RemoteEngineServiceImpl {
request: Request<ReadRequest>,
) -> Result<StreamWithMetric<StreamReadMetricCollector>> {
let metric = StreamReadMetricCollector(Instant::now());
let (tx, rx) = mpsc::channel(query_dedup.config.notify_queue_cap);

let request = request.into_inner();
let table_engine::remote::model::ReadRequest {
Expand All @@ -303,14 +314,25 @@ impl RemoteEngineServiceImpl {
read_request.projected_schema.projection(),
);

match query_dedup
.request_notifiers
.insert_notifier(request_key.clone(), tx)
{
let QueryDedup {
config,
request_notifiers,
..
} = query_dedup;

let (tx, rx) = mpsc::channel(config.notify_queue_cap);
match request_notifiers.insert_notifier(request_key.clone(), tx) {
// The first request, need to handle it, and then notify the other requests.
RequestResult::First => {
self.read_and_send_dedupped_resps(request, request_key, query_dedup)
.await?;
let ctx = self.handler_ctx();
let query = async move { handle_stream_read(ctx, request).await };
self.read_and_send_dedupped_resps(
request_key,
query,
request_notifiers.clone(),
config.notify_timeout.0,
)
.await?;
}
// The request is waiting for the result of first request.
RequestResult::Wait => {
Expand All @@ -325,22 +347,22 @@ impl RemoteEngineServiceImpl {
))
}

async fn read_and_send_dedupped_resps(
async fn read_and_send_dedupped_resps<K, F>(
&self,
request: ReadRequest,
request_key: StreamReadReqKey,
query_dedup: QueryDedup,
) -> Result<()> {
let ctx = self.handler_ctx();

request_key: K,
query: F,
notifiers: Arc<RequestNotifiers<K, mpsc::Sender<Result<RecordBatch>>>>,
notify_timeout: Duration,
) -> Result<()>
where
K: Hash + PartialEq + Eq,
F: Future<Output = Result<PartitionedStreams>> + Send + 'static,
{
// This is used to remove key when future is cancelled.
let mut guard = ExecutionGuard::new(|| {
query_dedup.request_notifiers.take_notifiers(&request_key);
notifiers.take_notifiers(&request_key);
});
let handle = self
.runtimes
.read_runtime
.spawn(async move { handle_stream_read(ctx, request).await });
let handle = self.runtimes.read_runtime.spawn(query);
let streams = handle.await.box_err().context(ErrWithCause {
code: StatusCode::Internal,
msg: "fail to join task",
Expand Down Expand Up @@ -379,15 +401,11 @@ impl RemoteEngineServiceImpl {

// We should set cancel to guard, otherwise the key will be removed twice.
guard.cancel();
let notifiers = query_dedup
.request_notifiers
.take_notifiers(&request_key)
.unwrap();
let notifiers = notifiers.take_notifiers(&request_key).unwrap();

// Do send in background to avoid blocking the rpc procedure.
let timeout = query_dedup.config.notify_timeout.0;
self.runtimes.read_runtime.spawn(async move {
Self::send_dedupped_resps(resps, notifiers, timeout).await;
Self::send_dedupped_resps(resps, notifiers, notify_timeout).await;
});

Ok(())
Expand Down Expand Up @@ -564,11 +582,12 @@ impl RemoteEngineServiceImpl {
let metric = ExecutePlanMetricCollect(Instant::now());
let request = request.into_inner();
let query_engine = self.instance.query_engine.clone();
let (ctx, encoded_plan) = extract_plan_from_req(request)?;

let stream = self
.runtimes
.read_runtime
.spawn(async move { handle_execute_plan(request, query_engine).await })
.spawn(async move { handle_execute_plan(ctx, encoded_plan, query_engine).await })
.await
.box_err()
.with_context(|| ErrWithCause {
Expand All @@ -585,6 +604,56 @@ impl RemoteEngineServiceImpl {
Ok(StreamWithMetric::new(Box::pin(stream), metric))
}

async fn dedup_execute_physical_plan_internal(
&self,
query_dedup: QueryDedup,
request: Request<ExecutePlanRequest>,
) -> Result<StreamWithMetric<ExecutePlanMetricCollect>> {
let metric = ExecutePlanMetricCollect(Instant::now());
let request = request.into_inner();

let query_engine = self.instance.query_engine.clone();
let (ctx, encoded_plan) = extract_plan_from_req(request)?;
let key = PhysicalPlanKey {
encoded_plan: encoded_plan.clone(),
};

let QueryDedup {
config,
physical_plan_notifiers,
..
} = query_dedup;

let (tx, rx) = mpsc::channel(config.notify_queue_cap);
match physical_plan_notifiers.insert_notifier(key.clone(), tx) {
// The first request, need to handle it, and then notify the other requests.
RequestResult::First => {
let query = async move {
handle_execute_plan(ctx, encoded_plan, query_engine)
.await
.map(PartitionedStreams::one_stream)
};
self.read_and_send_dedupped_resps(
key,
query,
physical_plan_notifiers,
config.notify_timeout.0,
)
.await?;
}
// The request is waiting for the result of first request.
RequestResult::Wait => {
// TODO: add metrics to collect the time cost of waited stream
// read.
}
}

Ok(StreamWithMetric::new(
Box::pin(ReceiverStream::new(rx)),
metric,
))
}

fn handler_ctx(&self) -> HandlerContext {
HandlerContext {
catalog_manager: self.instance.catalog_manager.clone(),
Expand Down Expand Up @@ -652,17 +721,13 @@ impl RemoteEngineService for RemoteEngineServiceImpl {
&self,
request: Request<ExecutePlanRequest>,
) -> std::result::Result<Response<Self::ExecutePhysicalPlanStream>, Status> {
let record_stream_result =
self.execute_physical_plan_internal(request)
.await
.map(|stream| {
stream.map(|batch_result| {
batch_result.box_err().with_context(|| ErrWithCause {
code: StatusCode::Internal,
msg: "failed to poll record batch",
})
})
});
let record_stream_result = match self.query_dedup.clone() {
Some(query_dedup) => {
self.dedup_execute_physical_plan_internal(query_dedup, request)
.await
}
None => self.execute_physical_plan_internal(request).await,
};

record_stream_to_response_stream!(record_stream_result, ExecutePhysicalPlanStream)
}
Expand Down Expand Up @@ -870,22 +935,33 @@ async fn handle_get_table_info(
})
}

async fn handle_execute_plan(
request: ExecutePlanRequest,
query_engine: QueryEngineRef,
) -> Result<SendableRecordBatchStream> {
fn extract_plan_from_req(request: ExecutePlanRequest) -> Result<(ExecContext, Vec<u8>)> {
// Build execution context.
let ctx_in_req = request.context.with_context(|| ErrNoCause {
code: StatusCode::Internal,
msg: "execution context not found in physical plan request",
})?;
let typed_plan_in_req = request.physical_plan.with_context(|| ErrNoCause {
code: StatusCode::Internal,
msg: "plan not found in physical plan request",
})?;
// FIXME: return the type from query engine.
let valid_plan = check_and_extract_plan(typed_plan_in_req, QueryEngineType::Datafusion)?;

Ok((ctx_in_req, valid_plan))
}

async fn handle_execute_plan(
ctx: ExecContext,
encoded_plan: Vec<u8>,
query_engine: QueryEngineRef,
) -> Result<SendableRecordBatchStream> {
let ExecContext {
request_id,
default_catalog,
default_schema,
timeout_ms,
} = ctx_in_req;
} = ctx;

let request_id = RequestId::from(request_id);
let deadline = if timeout_ms >= 0 {
Expand All @@ -901,16 +977,9 @@ async fn handle_execute_plan(
default_schema,
};

// Build physical plan.
let typed_plan_in_req = request.physical_plan.with_context(|| ErrNoCause {
code: StatusCode::Internal,
msg: "plan not found in physical plan request",
})?;
// FIXME: return the type from query engine.
let valid_plan = check_and_extract_plan(typed_plan_in_req, QueryEngineType::Datafusion)?;
// TODO: Build remote plan in physical planner.
let physical_plan = Box::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
valid_plan,
encoded_plan,
)));

// Execute plan.
Expand Down
Loading