Skip to content

Commit

Permalink
Merge pull request #8268 from youngsofun/http
Browse files Browse the repository at this point in the history
feat(http handler): return error when download result of a running query
  • Loading branch information
mergify[bot] authored Oct 18, 2022
2 parents 19584d9 + b5dbed8 commit 1933343
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 25 deletions.
19 changes: 19 additions & 0 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,25 @@ async fn result_download_handler(
let format =
OutputFormatType::from_str(&params.format.unwrap_or(default_format)).map_err(BadRequest)?;

let http_query_manager = HttpQueryManager::instance();
if let Some(query) = http_query_manager.get_query(&query_id).await {
let state = query.get_response_state_only().await.state;
match state.state {
ExecuteStateKind::Running => {
return Err(PoemError::from_string(
"running",
StatusCode::INTERNAL_SERVER_ERROR,
));
}
ExecuteStateKind::Failed => {
return Err(PoemError::from_string(
"failed",
StatusCode::INTERNAL_SERVER_ERROR,
));
}
ExecuteStateKind::Succeeded => {}
}
}
let ctx = session
.create_query_context()
.await
Expand Down
91 changes: 66 additions & 25 deletions src/query/service/tests/it/servers/http/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,39 @@ async fn test_download() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn test_download_running() -> Result<()> {
let _guard = TestGlobalServices::setup(ConfigBuilder::create().build()).await?;

let ep = create_endpoint().await?;

let sql = "select sleep(10)";
let (status, result) = post_sql_to_endpoint(&ep, sql, 1).await?;
assert_eq!(status, StatusCode::OK, "{:?}", result);
let query_id = &result.id;
assert_eq!(result.state, ExecuteStateKind::Running, "{:?}", result);

let uri = format!("/v1/query/{query_id}/download?limit=1");
let resp = get_uri(&ep, &uri).await;
assert_eq!(
resp.status(),
StatusCode::INTERNAL_SERVER_ERROR,
"{:?}",
resp
);
assert_eq!(resp.into_body().into_string().await.unwrap(), "running");

let response = get_uri(&ep, &result.final_uri.unwrap()).await;
assert_eq!(response.status(), StatusCode::OK, "{:?}", response);

let uri = format!("/v1/query/{query_id}/download?limit=1");
let mut resp = get_uri(&ep, &uri).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND, "{:?}", resp);
let exp = "not exists";
assert!(resp.take_body().into_string().await.unwrap().contains(exp));
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn test_download_non_select() -> Result<()> {
let _guard = TestGlobalServices::setup(ConfigBuilder::create().build()).await?;
Expand Down Expand Up @@ -1139,6 +1172,9 @@ async fn test_download_failed() -> Result<()> {
let (status, result) = post_sql_to_endpoint_new_session(&ep, sql, 1).await?;
assert_eq!(status, StatusCode::OK, "{:?}", result);

let response = get_uri(&ep, &result.final_uri.clone().unwrap()).await;
assert_eq!(response.status(), StatusCode::OK, "{:?}", response);

let mut resp = download(&ep, &result.id).await;
let exp = "not exists";
assert!(resp.take_body().into_string().await.unwrap().contains(exp));
Expand Down Expand Up @@ -1190,31 +1226,36 @@ async fn test_download_killed() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn test_no_download_in_management_mode() -> Result<()> {
// Setup
let config = ConfigBuilder::create().with_management_mode().build();
let _guard = TestGlobalServices::setup(config.clone()).await?;

let session_middleware = HTTPSessionMiddleware::create(
HttpHandlerKind::Query,
AuthMgr::create(config.clone()).await?,
);

let ep = Route::new()
.nest("/v1/query", query_route())
.with(session_middleware);
let sql = "select 1";
let (status, result) = post_sql_to_endpoint(&ep, sql, 1).await?;
assert_eq!(status, StatusCode::OK, "{:?}", result);

let mut resp = download(&ep, &result.id).await;
let exp = "not exists";
assert!(resp.take_body().into_string().await.unwrap().contains(exp));
assert_eq!(resp.status(), StatusCode::NOT_FOUND, "{:?}", result);

Ok(())
}
// todo(youngsofun): implement it later
// #[tokio::test(flavor = "current_thread")]
// async fn test_no_download_in_management_mode() -> Result<()> {
// // Setup
// let config = ConfigBuilder::create().with_management_mode().build();
// let _guard = TestGlobalServices::setup(config.clone()).await?;
//
// let session_middleware = HTTPSessionMiddleware::create(
// HttpHandlerKind::Query,
// AuthMgr::create(config.clone()).await?,
// );
//
// let ep = Route::new()
// .nest("/v1/query", query_route())
// .with(session_middleware);
// let sql = "show databases";
// let (status, result) = post_sql_to_endpoint(&ep, sql, 1).await?;
// assert_eq!(status, StatusCode::OK, "{:?}", result);
//
// let response = get_uri(&ep, &result.final_uri.clone().unwrap()).await;
// assert_eq!(response.status(), StatusCode::OK, "{:?}", response);
//
// let mut resp = download(&ep, &result.id).await;
// let exp = "not exists";
// let body = resp.take_body().into_string().await.unwrap();
// assert_eq!(resp.status(), StatusCode::NOT_FOUND, "{:?} {}", result, body);
// assert!(body.contains(exp), "body = {}", body);
//
// Ok(())
// }

#[tokio::test(flavor = "current_thread")]
async fn test_func_object_keys() -> Result<()> {
Expand Down

1 comment on commit 1933343

@vercel
Copy link

@vercel vercel bot commented on 1933343 Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend-git-main-databend.vercel.app
databend.rs
databend-databend.vercel.app

Please sign in to comment.