Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http handler): return error when download result of a running query #8268

Merged
merged 3 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,25 @@ async fn result_download_handler(
let format =
OutputFormatType::from_str(&params.format.unwrap_or(default_format)).map_err(BadRequest)?;

let http_query_manager = HttpQueryManager::instance();
if let Some(query) = http_query_manager.get_query(&query_id).await {
let state = query.get_response_state_only().await.state;
match state.state {
ExecuteStateKind::Running => {
return Err(PoemError::from_string(
"running",
StatusCode::INTERNAL_SERVER_ERROR,
));
}
ExecuteStateKind::Failed => {
return Err(PoemError::from_string(
"failed",
StatusCode::INTERNAL_SERVER_ERROR,
));
}
ExecuteStateKind::Succeeded => {}
}
}
let ctx = session
.create_query_context()
.await
Expand Down
91 changes: 66 additions & 25 deletions src/query/service/tests/it/servers/http/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,39 @@ async fn test_download() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn test_download_running() -> Result<()> {
let _guard = TestGlobalServices::setup(ConfigBuilder::create().build()).await?;

let ep = create_endpoint().await?;

let sql = "select sleep(10)";
let (status, result) = post_sql_to_endpoint(&ep, sql, 1).await?;
assert_eq!(status, StatusCode::OK, "{:?}", result);
let query_id = &result.id;
assert_eq!(result.state, ExecuteStateKind::Running, "{:?}", result);

let uri = format!("/v1/query/{query_id}/download?limit=1");
let resp = get_uri(&ep, &uri).await;
assert_eq!(
resp.status(),
StatusCode::INTERNAL_SERVER_ERROR,
"{:?}",
resp
);
assert_eq!(resp.into_body().into_string().await.unwrap(), "running");

let response = get_uri(&ep, &result.final_uri.unwrap()).await;
assert_eq!(response.status(), StatusCode::OK, "{:?}", response);

let uri = format!("/v1/query/{query_id}/download?limit=1");
let mut resp = get_uri(&ep, &uri).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND, "{:?}", resp);
let exp = "not exists";
assert!(resp.take_body().into_string().await.unwrap().contains(exp));
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn test_download_non_select() -> Result<()> {
let _guard = TestGlobalServices::setup(ConfigBuilder::create().build()).await?;
Expand Down Expand Up @@ -1139,6 +1172,9 @@ async fn test_download_failed() -> Result<()> {
let (status, result) = post_sql_to_endpoint_new_session(&ep, sql, 1).await?;
assert_eq!(status, StatusCode::OK, "{:?}", result);

let response = get_uri(&ep, &result.final_uri.clone().unwrap()).await;
assert_eq!(response.status(), StatusCode::OK, "{:?}", response);

let mut resp = download(&ep, &result.id).await;
let exp = "not exists";
assert!(resp.take_body().into_string().await.unwrap().contains(exp));
Expand Down Expand Up @@ -1190,31 +1226,36 @@ async fn test_download_killed() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn test_no_download_in_management_mode() -> Result<()> {
// Setup
let config = ConfigBuilder::create().with_management_mode().build();
let _guard = TestGlobalServices::setup(config.clone()).await?;

let session_middleware = HTTPSessionMiddleware::create(
HttpHandlerKind::Query,
AuthMgr::create(config.clone()).await?,
);

let ep = Route::new()
.nest("/v1/query", query_route())
.with(session_middleware);
let sql = "select 1";
let (status, result) = post_sql_to_endpoint(&ep, sql, 1).await?;
assert_eq!(status, StatusCode::OK, "{:?}", result);

let mut resp = download(&ep, &result.id).await;
let exp = "not exists";
assert!(resp.take_body().into_string().await.unwrap().contains(exp));
assert_eq!(resp.status(), StatusCode::NOT_FOUND, "{:?}", result);

Ok(())
}
// todo(youngsofun): implement it later
// #[tokio::test(flavor = "current_thread")]
// async fn test_no_download_in_management_mode() -> Result<()> {
// // Setup
// let config = ConfigBuilder::create().with_management_mode().build();
// let _guard = TestGlobalServices::setup(config.clone()).await?;
//
// let session_middleware = HTTPSessionMiddleware::create(
// HttpHandlerKind::Query,
// AuthMgr::create(config.clone()).await?,
// );
//
// let ep = Route::new()
// .nest("/v1/query", query_route())
// .with(session_middleware);
// let sql = "show databases";
// let (status, result) = post_sql_to_endpoint(&ep, sql, 1).await?;
// assert_eq!(status, StatusCode::OK, "{:?}", result);
//
// let response = get_uri(&ep, &result.final_uri.clone().unwrap()).await;
// assert_eq!(response.status(), StatusCode::OK, "{:?}", response);
//
// let mut resp = download(&ep, &result.id).await;
// let exp = "not exists";
// let body = resp.take_body().into_string().await.unwrap();
// assert_eq!(resp.status(), StatusCode::NOT_FOUND, "{:?} {}", result, body);
// assert!(body.contains(exp), "body = {}", body);
//
// Ok(())
// }

#[tokio::test(flavor = "current_thread")]
async fn test_func_object_keys() -> Result<()> {
Expand Down