diff --git a/query/src/servers/http/v1/http_query_handlers.rs b/query/src/servers/http/v1/http_query_handlers.rs index ed8aa4cae636..0c5f4717c823 100644 --- a/query/src/servers/http/v1/http_query_handlers.rs +++ b/query/src/servers/http/v1/http_query_handlers.rs @@ -269,6 +269,7 @@ fn query_id_not_found(query_id: String) -> PoemError { #[derive(Deserialize)] struct DownloadHandlerParams { pub format: Option, + pub limit: Option, } #[poem::handler] @@ -298,7 +299,7 @@ async fn result_download_handler( })?; let stream = result_table - .download(ctx, format) + .download(ctx, format, params.limit) .await .map_err(InternalServerError)?; diff --git a/query/src/storages/result/download.rs b/query/src/storages/result/download.rs index 8c98dc2c852d..5eed5e172077 100644 --- a/query/src/storages/result/download.rs +++ b/query/src/storages/result/download.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_stream::stream; use common_exception::Result; use common_formats::output_format::OutputFormatType; +use common_planners::Extras; use common_planners::ReadDataSourcePlan; use common_planners::SourceInfo; use futures::StreamExt; @@ -33,8 +34,19 @@ impl ResultTable { &self, ctx: Arc, fmt: OutputFormatType, + limit: Option, ) -> Result { - let (_, parts) = self.read_partitions(ctx.clone(), None).await?; + let push_downs = match limit { + Some(limit) if limit > 0 => Some(Extras { + limit: Some(limit), + ..Extras::default() + }), + _ => None, + }; + + let (_, parts) = self + .read_partitions(ctx.clone(), push_downs.clone()) + .await?; ctx.try_set_partitions(parts)?; let mut block_stream = self .read(ctx.clone(), &ReadDataSourcePlan { @@ -45,7 +57,7 @@ impl ResultTable { statistics: Default::default(), description: "".to_string(), tbl_args: None, - push_downs: None, + push_downs, }) .await?; let fmt_setting = ctx.get_format_settings()?; diff --git a/query/src/storages/result/result_table.rs b/query/src/storages/result/result_table.rs index e19a693a28c6..598ab34a4118 100644 --- a/query/src/storages/result/result_table.rs +++ b/query/src/storages/result/result_table.rs @@ -26,6 +26,7 @@ use common_planners::Partitions; use common_planners::ReadDataSourcePlan; use common_planners::Statistics; use common_streams::SendableDataBlockStream; +use common_streams::TakeStream; use common_tracing::tracing_futures::Instrument; use futures::StreamExt; use serde::Deserialize; @@ -140,15 +141,18 @@ impl Table for ResultTable { async fn read_partitions( &self, ctx: Arc, - _push_downs: Option, + push_downs: Option, ) -> Result<(Statistics, Partitions)> { let data_accessor = ctx.get_storage_operator()?; let meta_location = self.locations.get_meta_location(); let meta_data = data_accessor.object(&meta_location).read().await?; let meta: ResultTableMeta = serde_json::from_slice(&meta_data)?; + let limit = push_downs + .map(|e| e.limit.unwrap_or(usize::MAX)) + .unwrap_or(usize::MAX); match meta.storage { ResultStorageInfo::FuseSegment(seg) => { - Ok(FuseTable::all_columns_partitions(&seg.blocks, usize::MAX)) + Ok(FuseTable::all_columns_partitions(&seg.blocks, limit)) } } } @@ -156,7 +160,7 @@ impl Table for ResultTable { async fn read( &self, ctx: Arc, - _plan: &ReadDataSourcePlan, + plan: &ReadDataSourcePlan, ) -> Result { let block_reader = self.create_block_reader(&ctx, &None)?; let iter = std::iter::from_fn(move || match ctx.clone().try_get_partitions(1) { @@ -174,7 +178,11 @@ impl Table for ResultTable { async move { block_reader.read(part).await } }) .instrument(common_tracing::tracing::Span::current()); - + if let Some(extra) = &plan.push_downs { + if let Some(limit) = extra.limit { + return Ok(Box::pin(TakeStream::new(Box::pin(stream), limit))); + } + } Ok(Box::pin(stream)) } diff --git a/query/tests/it/servers/http/http_query_handlers.rs b/query/tests/it/servers/http/http_query_handlers.rs index cbdabcef7da3..9749e37816d7 100644 --- a/query/tests/it/servers/http/http_query_handlers.rs +++ b/query/tests/it/servers/http/http_query_handlers.rs @@ -1107,6 +1107,20 @@ async fn test_download(v2: u64) -> Result<()> { fmt ); } + + // test download with limits + let uri = format!("/v1/query/{query_id}/download?limit=1"); + let resp = get_uri(&ep, &uri).await; + assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp); + let exp = "0,1\n"; + assert_eq!(resp.into_body().into_string().await.unwrap(), exp); + + let uri = format!("/v1/query/{query_id}/download?limit=0"); + let resp = get_uri(&ep, &uri).await; + assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp); + let exp = "0,1\n1,2\n"; + assert_eq!(resp.into_body().into_string().await.unwrap(), exp); + Ok(()) }