Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: batch size hint of stream consumption #16363

Closed
dantengsky opened this issue Sep 2, 2024 · 2 comments · Fixed by #16372
Closed

Feature: batch size hint of stream consumption #16363

dantengsky opened this issue Sep 2, 2024 · 2 comments · Fixed by #16372
Assignees
Labels
A-query Area: databend query C-feature Category: feature

Comments

@dantengsky
Copy link
Member

dantengsky commented Sep 2, 2024

Summary

When consuming streams, there are cases where the size of the change set needs to be constrained.

For example:

SELECT * FROM stream_s WITH CONSUME MAX_BATCH_SIZE_HINT 100;

While consuming stream_s, aim to return a change set of up to 100 rows if possible. Any remaining data will be left for the next round of consumption.

@dantengsky dantengsky added the C-feature Category: feature label Sep 2, 2024
@dosubot dosubot bot added the A-query Area: databend query label Sep 2, 2024
@sundy-li
Copy link
Member

sundy-li commented Sep 2, 2024

@dosu how to implement this feature?

Copy link

dosubot bot commented Sep 2, 2024

Hi @sundy-li! I'm here to assist you with any bugs, questions, or contributions while you wait for a human maintainer. Let me know how I can help! Dosu

To implement the feature for batch size hint of stream consumption in SQL, such as SELECT * FROM stream_s WITH CONSUME MAX_BATCH_SIZE_HINT 100;, you need to modify the SelectInterpreter to handle the MAX_BATCH_SIZE_HINT parameter. This involves parsing the hint from the SQL query and then using it to configure the stream consumption.

Here is a high-level approach to implement this feature:

  1. Parse the MAX_BATCH_SIZE_HINT from the SQL query: Modify the SQL parser to recognize and extract the MAX_BATCH_SIZE_HINT parameter.

  2. Store the parsed hint in the SelectInterpreter: Add a field to the SelectInterpreter to store the batch size hint.

  3. Use the batch size hint during stream consumption: Modify the stream consumption logic to use the batch size hint.

Below is a simplified example of how you might implement this:

Step 1: Modify the SQL Parser

You need to modify the SQL parser to recognize the MAX_BATCH_SIZE_HINT parameter. This part is not shown in the provided code, but you would typically add this to your SQL grammar and parsing logic.

Step 2: Store the Batch Size Hint in SelectInterpreter

Add a field to the SelectInterpreter to store the batch size hint.

pub struct SelectInterpreter {
    ctx: Arc<QueryContext>,
    s_expr: SExpr,
    bind_context: BindContext,
    metadata: MetadataRef,
    formatted_ast: Option<String>,
    ignore_result: bool,
    max_batch_size_hint: Option<usize>, // Add this field
}

impl SelectInterpreter {
    pub fn try_create(
        ctx: Arc<QueryContext>,
        bind_context: BindContext,
        s_expr: SExpr,
        metadata: MetadataRef,
        formatted_ast: Option<String>,
        ignore_result: bool,
        max_batch_size_hint: Option<usize>, // Add this parameter
    ) -> Result<Self> {
        Ok(SelectInterpreter {
            ctx,
            s_expr,
            bind_context,
            metadata,
            formatted_ast,
            ignore_result,
            max_batch_size_hint, // Initialize the field
        })
    }
}

Step 3: Use the Batch Size Hint During Stream Consumption

Modify the stream consumption logic to use the batch size hint.

#[async_backtrace::framed]
pub async fn build_pipeline(
    &self,
    mut physical_plan: PhysicalPlan,
) -> Result<PipelineBuildResult> {
    if let PhysicalPlan::Exchange(exchange) = &mut physical_plan {
        if exchange.kind == FragmentKind::Merge && self.ignore_result {
            exchange.ignore_exchange = self.ignore_result;
        }
    }

    let mut build_res = build_query_pipeline(
        &self.ctx,
        &self.bind_context.columns,
        &physical_plan,
        self.ignore_result,
    )
    .await?;

    // consume stream
    let update_stream_metas = query_build_update_stream_req(&self.ctx, &self.metadata).await?;

    let catalog = self.ctx.get_default_catalog()?;
    build_res
        .main_pipeline
        .set_on_finished(move |info: &ExecutionInfo| match &info.res {
            Ok(_) => GlobalIORuntime::instance().block_on(async move {
                match update_stream_metas {
                    Some(streams) => {
                        let r = UpdateMultiTableMetaReq {
                            update_table_metas: streams.update_table_metas,
                            ..Default::default()
                        };
                        info!("Updating the stream meta to consume data");
                        catalog.update_multi_table_meta(r).await.map(|_| ())
                    }
                    None => Ok(()),
                }
            }),
            Err(error_code) => Err(error_code.clone()),
        });

    // Use the batch size hint if provided
    if let Some(max_batch_size) = self.max_batch_size_hint {
        build_res.main_pipeline.set_max_batch_size(max_batch_size);
    }

    Ok(build_res)
}

In this example, set_max_batch_size is a hypothetical method that you would need to implement in the Pipeline class to configure the batch size for stream consumption.

This is a high-level overview, and the actual implementation details may vary based on your specific codebase and requirements [1].

To continue talking to Dosu, mention @dosu.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-query Area: databend query C-feature Category: feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants