Skip to content

Commit

Permalink
Merge pull request #6546 from youngsofun/ch
Browse files Browse the repository at this point in the history
feat(query): /v1/download support limit number of rows.
  • Loading branch information
BohuTANG authored Jul 8, 2022
2 parents 68f4053 + d2841ab commit aa35e7c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 7 deletions.
3 changes: 2 additions & 1 deletion query/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ fn query_id_not_found(query_id: String) -> PoemError {
#[derive(Deserialize)]
struct DownloadHandlerParams {
pub format: Option<String>,
pub limit: Option<usize>,
}

#[poem::handler]
Expand Down Expand Up @@ -298,7 +299,7 @@ async fn result_download_handler(
})?;

let stream = result_table
.download(ctx, format)
.download(ctx, format, params.limit)
.await
.map_err(InternalServerError)?;

Expand Down
16 changes: 14 additions & 2 deletions query/src/storages/result/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use async_stream::stream;
use common_exception::Result;
use common_formats::output_format::OutputFormatType;
use common_planners::Extras;
use common_planners::ReadDataSourcePlan;
use common_planners::SourceInfo;
use futures::StreamExt;
Expand All @@ -33,8 +34,19 @@ impl ResultTable {
&self,
ctx: Arc<QueryContext>,
fmt: OutputFormatType,
limit: Option<usize>,
) -> Result<SendableVu8Stream> {
let (_, parts) = self.read_partitions(ctx.clone(), None).await?;
let push_downs = match limit {
Some(limit) if limit > 0 => Some(Extras {
limit: Some(limit),
..Extras::default()
}),
_ => None,
};

let (_, parts) = self
.read_partitions(ctx.clone(), push_downs.clone())
.await?;
ctx.try_set_partitions(parts)?;
let mut block_stream = self
.read(ctx.clone(), &ReadDataSourcePlan {
Expand All @@ -45,7 +57,7 @@ impl ResultTable {
statistics: Default::default(),
description: "".to_string(),
tbl_args: None,
push_downs: None,
push_downs,
})
.await?;
let fmt_setting = ctx.get_format_settings()?;
Expand Down
16 changes: 12 additions & 4 deletions query/src/storages/result/result_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_planners::Partitions;
use common_planners::ReadDataSourcePlan;
use common_planners::Statistics;
use common_streams::SendableDataBlockStream;
use common_streams::TakeStream;
use common_tracing::tracing_futures::Instrument;
use futures::StreamExt;
use serde::Deserialize;
Expand Down Expand Up @@ -140,23 +141,26 @@ impl Table for ResultTable {
async fn read_partitions(
&self,
ctx: Arc<QueryContext>,
_push_downs: Option<Extras>,
push_downs: Option<Extras>,
) -> Result<(Statistics, Partitions)> {
let data_accessor = ctx.get_storage_operator()?;
let meta_location = self.locations.get_meta_location();
let meta_data = data_accessor.object(&meta_location).read().await?;
let meta: ResultTableMeta = serde_json::from_slice(&meta_data)?;
let limit = push_downs
.map(|e| e.limit.unwrap_or(usize::MAX))
.unwrap_or(usize::MAX);
match meta.storage {
ResultStorageInfo::FuseSegment(seg) => {
Ok(FuseTable::all_columns_partitions(&seg.blocks, usize::MAX))
Ok(FuseTable::all_columns_partitions(&seg.blocks, limit))
}
}
}

async fn read(
&self,
ctx: Arc<QueryContext>,
_plan: &ReadDataSourcePlan,
plan: &ReadDataSourcePlan,
) -> Result<SendableDataBlockStream> {
let block_reader = self.create_block_reader(&ctx, &None)?;
let iter = std::iter::from_fn(move || match ctx.clone().try_get_partitions(1) {
Expand All @@ -174,7 +178,11 @@ impl Table for ResultTable {
async move { block_reader.read(part).await }
})
.instrument(common_tracing::tracing::Span::current());

if let Some(extra) = &plan.push_downs {
if let Some(limit) = extra.limit {
return Ok(Box::pin(TakeStream::new(Box::pin(stream), limit)));
}
}
Ok(Box::pin(stream))
}

Expand Down
14 changes: 14 additions & 0 deletions query/tests/it/servers/http/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,20 @@ async fn test_download(v2: u64) -> Result<()> {
fmt
);
}

// test download with limits
let uri = format!("/v1/query/{query_id}/download?limit=1");
let resp = get_uri(&ep, &uri).await;
assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp);
let exp = "0,1\n";
assert_eq!(resp.into_body().into_string().await.unwrap(), exp);

let uri = format!("/v1/query/{query_id}/download?limit=0");
let resp = get_uri(&ep, &uri).await;
assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp);
let exp = "0,1\n1,2\n";
assert_eq!(resp.into_body().into_string().await.unwrap(), exp);

Ok(())
}

Expand Down

0 comments on commit aa35e7c

Please sign in to comment.