Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): /v1/download support limit number of rows. #6546

Merged
merged 2 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion query/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ fn query_id_not_found(query_id: String) -> PoemError {
#[derive(Deserialize)]
struct DownloadHandlerParams {
pub format: Option<String>,
pub limit: Option<usize>,
}

#[poem::handler]
Expand Down Expand Up @@ -298,7 +299,7 @@ async fn result_download_handler(
})?;

let stream = result_table
.download(ctx, format)
.download(ctx, format, params.limit)
.await
.map_err(InternalServerError)?;

Expand Down
16 changes: 14 additions & 2 deletions query/src/storages/result/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use async_stream::stream;
use common_exception::Result;
use common_formats::output_format::OutputFormatType;
use common_planners::Extras;
use common_planners::ReadDataSourcePlan;
use common_planners::SourceInfo;
use futures::StreamExt;
Expand All @@ -33,8 +34,19 @@ impl ResultTable {
&self,
ctx: Arc<QueryContext>,
fmt: OutputFormatType,
limit: Option<usize>,
) -> Result<SendableVu8Stream> {
let (_, parts) = self.read_partitions(ctx.clone(), None).await?;
let push_downs = match limit {
Some(limit) if limit > 0 => Some(Extras {
limit: Some(limit),
..Extras::default()
}),
_ => None,
};

let (_, parts) = self
.read_partitions(ctx.clone(), push_downs.clone())
.await?;
ctx.try_set_partitions(parts)?;
let mut block_stream = self
.read(ctx.clone(), &ReadDataSourcePlan {
Expand All @@ -45,7 +57,7 @@ impl ResultTable {
statistics: Default::default(),
description: "".to_string(),
tbl_args: None,
push_downs: None,
push_downs,
})
.await?;
let fmt_setting = ctx.get_format_settings()?;
Expand Down
16 changes: 12 additions & 4 deletions query/src/storages/result/result_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_planners::Partitions;
use common_planners::ReadDataSourcePlan;
use common_planners::Statistics;
use common_streams::SendableDataBlockStream;
use common_streams::TakeStream;
use common_tracing::tracing_futures::Instrument;
use futures::StreamExt;
use serde::Deserialize;
Expand Down Expand Up @@ -140,23 +141,26 @@ impl Table for ResultTable {
async fn read_partitions(
&self,
ctx: Arc<QueryContext>,
_push_downs: Option<Extras>,
push_downs: Option<Extras>,
) -> Result<(Statistics, Partitions)> {
let data_accessor = ctx.get_storage_operator()?;
let meta_location = self.locations.get_meta_location();
let meta_data = data_accessor.object(&meta_location).read().await?;
let meta: ResultTableMeta = serde_json::from_slice(&meta_data)?;
let limit = push_downs
.map(|e| e.limit.unwrap_or(usize::MAX))
.unwrap_or(usize::MAX);
match meta.storage {
ResultStorageInfo::FuseSegment(seg) => {
Ok(FuseTable::all_columns_partitions(&seg.blocks, usize::MAX))
Ok(FuseTable::all_columns_partitions(&seg.blocks, limit))
}
}
}

async fn read(
&self,
ctx: Arc<QueryContext>,
_plan: &ReadDataSourcePlan,
plan: &ReadDataSourcePlan,
) -> Result<SendableDataBlockStream> {
let block_reader = self.create_block_reader(&ctx, &None)?;
let iter = std::iter::from_fn(move || match ctx.clone().try_get_partitions(1) {
Expand All @@ -174,7 +178,11 @@ impl Table for ResultTable {
async move { block_reader.read(part).await }
})
.instrument(common_tracing::tracing::Span::current());

if let Some(extra) = &plan.push_downs {
if let Some(limit) = extra.limit {
return Ok(Box::pin(TakeStream::new(Box::pin(stream), limit)));
}
}
Ok(Box::pin(stream))
}

Expand Down
14 changes: 14 additions & 0 deletions query/tests/it/servers/http/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,20 @@ async fn test_download(v2: u64) -> Result<()> {
fmt
);
}

// test download with limits
let uri = format!("/v1/query/{query_id}/download?limit=1");
let resp = get_uri(&ep, &uri).await;
assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp);
let exp = "0,1\n";
assert_eq!(resp.into_body().into_string().await.unwrap(), exp);

let uri = format!("/v1/query/{query_id}/download?limit=0");
let resp = get_uri(&ep, &uri).await;
assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp);
let exp = "0,1\n1,2\n";
assert_eq!(resp.into_body().into_string().await.unwrap(), exp);

Ok(())
}

Expand Down