diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 7a2abf33cf09..f01daa1c37f6 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -49,6 +49,7 @@ use tower_http::decompression::RequestDecompressionLayer; use tower_http::trace::TraceLayer; use self::authorize::AuthState; +use self::table_result::TableResponse; use crate::configurator::ConfiguratorRef; use crate::error::{AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu}; use crate::http::arrow_result::ArrowResponse; @@ -90,6 +91,7 @@ mod dashboard; pub mod error_result; pub mod greptime_result_v1; pub mod influxdb_result_v1; +pub mod table_result; pub const HTTP_API_VERSION: &str = "v1"; pub const HTTP_API_PREFIX: &str = "/v1/"; @@ -254,6 +256,7 @@ pub enum GreptimeQueryOutput { pub enum ResponseFormat { Arrow, Csv, + Table, #[default] GreptimedbV1, InfluxdbV1, @@ -264,6 +267,7 @@ impl ResponseFormat { match s { "arrow" => Some(ResponseFormat::Arrow), "csv" => Some(ResponseFormat::Csv), + "table" => Some(ResponseFormat::Table), "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1), "influxdb_v1" => Some(ResponseFormat::InfluxdbV1), _ => None, @@ -274,6 +278,7 @@ impl ResponseFormat { match self { ResponseFormat::Arrow => "arrow", ResponseFormat::Csv => "csv", + ResponseFormat::Table => "table", ResponseFormat::GreptimedbV1 => "greptimedb_v1", ResponseFormat::InfluxdbV1 => "influxdb_v1", } @@ -328,6 +333,7 @@ impl Display for Epoch { pub enum HttpResponse { Arrow(ArrowResponse), Csv(CsvResponse), + Table(TableResponse), Error(ErrorResponse), GreptimedbV1(GreptimedbV1Response), InfluxdbV1(InfluxdbV1Response), @@ -338,6 +344,7 @@ impl HttpResponse { match self { HttpResponse::Arrow(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(), + HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(), @@ -350,6 +357,7 @@ impl IntoResponse for HttpResponse { match self { HttpResponse::Arrow(resp) => resp.into_response(), HttpResponse::Csv(resp) => resp.into_response(), + HttpResponse::Table(resp) => resp.into_response(), HttpResponse::GreptimedbV1(resp) => resp.into_response(), HttpResponse::InfluxdbV1(resp) => resp.into_response(), HttpResponse::Error(resp) => resp.into_response(), @@ -373,6 +381,12 @@ impl From for HttpResponse { } } +impl From for HttpResponse { + fn from(value: TableResponse) -> Self { + HttpResponse::Table(value) + } +} + impl From for HttpResponse { fn from(value: ErrorResponse) -> Self { HttpResponse::Error(value) @@ -971,6 +985,7 @@ mod test { ResponseFormat::GreptimedbV1, ResponseFormat::InfluxdbV1, ResponseFormat::Csv, + ResponseFormat::Table, ResponseFormat::Arrow, ] { let recordbatches = @@ -979,6 +994,7 @@ mod test { let json_resp = match format { ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await, ResponseFormat::Csv => CsvResponse::from_output(outputs).await, + ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await, }; @@ -1021,6 +1037,21 @@ mod test { panic!("invalid output type"); } } + + HttpResponse::Table(resp) => { + let output = &resp.output()[0]; + if let GreptimeQueryOutput::Records(r) = output { + assert_eq!(r.num_rows(), 4); + assert_eq!(r.num_cols(), 2); + assert_eq!(r.schema.column_schemas[0].name, "numbers"); + assert_eq!(r.schema.column_schemas[0].data_type, "UInt32"); + assert_eq!(r.rows[0][0], serde_json::Value::from(1)); + assert_eq!(r.rows[0][1], serde_json::Value::Null); + } else { + panic!("invalid output type"); + } + } + HttpResponse::Arrow(resp) => { let output = resp.data; let mut reader = diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 7506ebbddbd0..614b69b3d9c9 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -38,6 +38,7 @@ use crate::http::csv_result::CsvResponse; use crate::http::error_result::ErrorResponse; use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::influxdb_result_v1::InfluxdbV1Response; +use crate::http::table_result::TableResponse; use crate::http::{ ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput, HttpResponse, ResponseFormat, @@ -119,6 +120,7 @@ pub async fn sql( let resp = match format { ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await, ResponseFormat::Csv => CsvResponse::from_output(outputs).await, + ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, }; diff --git a/src/servers/src/http/table_result.rs b/src/servers/src/http/table_result.rs new file mode 100644 index 000000000000..005eec479a8d --- /dev/null +++ b/src/servers/src/http/table_result.rs @@ -0,0 +1,176 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::max; +use std::fmt::{Display, Write}; + +use axum::http::{header, HeaderValue}; +use axum::response::{IntoResponse, Response}; +use common_error::status_code::StatusCode; +use common_query::Output; +use itertools::Itertools; +use mime_guess::mime; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::http::error_result::ErrorResponse; +use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; + +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct TableResponse { + output: Vec, + execution_time_ms: u64, +} + +impl TableResponse { + pub async fn from_output(outputs: Vec>) -> HttpResponse { + match handler::from_output(ResponseFormat::Csv, outputs).await { + Err(err) => HttpResponse::Error(err), + Ok((output, _)) => { + if output.len() > 1 { + HttpResponse::Error(ErrorResponse::from_error_message( + ResponseFormat::Table, + StatusCode::InvalidArguments, + "Multi-statements are not allowed".to_string(), + )) + } else { + HttpResponse::Table(TableResponse { + output, + execution_time_ms: 0, + }) + } + } + } + } + + pub fn output(&self) -> &[GreptimeQueryOutput] { + &self.output + } + + pub fn with_execution_time(mut self, execution_time: u64) -> Self { + self.execution_time_ms = execution_time; + self + } + + pub fn execution_time_ms(&self) -> u64 { + self.execution_time_ms + } +} + +impl Display for TableResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let payload = match self.output.first() { + None => String::default(), + Some(GreptimeQueryOutput::AffectedRows(n)) => { + format!("{n}\n") + } + Some(GreptimeQueryOutput::Records(records)) => { + let mut max_width = vec![0; records.num_cols()]; + let mut result = String::new(); + // Determine maximum width for each column + for (i, column) in records.schema.column_schemas.iter().enumerate() { + max_width[i] = max(max_width[i], column.name.len()); + } + for row in &records.rows { + for (i, v) in row.iter().enumerate() { + let s = v.to_string(); + max_width[i] = max(max_width[i], s.len()); + } + } + + // Construct the header + let head: String = records + .schema + .column_schemas + .iter() + .enumerate() + .map(|(i, column)| format!("─{:─<1$}─", column.name, max_width[i])) + .join("┬"); + writeln!(result, "┌{}┐", head).unwrap(); + + // Construct rows + for row in &records.rows { + let row = row + .iter() + .enumerate() + .map(|(i, v)| { + let s = v.to_string(); + format!(" {:1$} ", s, max_width[i]) + }) + .join("│"); + writeln!(result, "│{row}│").unwrap(); + } + + // Construct the footer + let footer: String = max_width.iter().map(|v| "─".repeat(*v + 2)).join("┴"); + writeln!(result, "└{}┘", footer).unwrap(); + result + } + }; + write!(f, "{}", payload) + } +} + +impl IntoResponse for TableResponse { + fn into_response(self) -> Response { + debug_assert!( + self.output.len() <= 1, + "self.output has extra elements: {}", + self.output.len() + ); + + let execution_time = self.execution_time_ms; + + let mut resp = ( + [( + header::CONTENT_TYPE, + HeaderValue::from_static(mime::PLAIN.as_ref()), + )], + self.to_string(), + ) + .into_response(); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_FORMAT, + HeaderValue::from_static("TABLE"), + ); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_EXECUTION_TIME, + HeaderValue::from(execution_time), + ); + resp + } +} +#[cfg(test)] +mod test { + + use super::TableResponse; + + #[tokio::test] + async fn test_table_format() { + let data = r#"{"output":[{"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"ts","data_type":"TimestampMillisecond"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"}]},"rows":[["127.0.0.1",1702433141000,0.5,0.2],["127.0.0.1",1702433146000,0.3,0.2],["127.0.0.1",1702433151000,0.4,0.3],["127.0.0.2",1702433141000,0.3,0.1],["127.0.0.2",1702433146000,0.2,0.4],["127.0.0.2",1702433151000,0.2,0.4]]}}],"execution_time_ms":13}"#; + let table_response: TableResponse = serde_json::from_str(data).unwrap(); + let payload = table_response.to_string(); + let expected_payload = r#"┌─host────────┬─ts────────────┬─cpu─┬─memory─┐ +│ "127.0.0.1" │ 1702433141000 │ 0.5 │ 0.2 │ +│ "127.0.0.1" │ 1702433146000 │ 0.3 │ 0.2 │ +│ "127.0.0.1" │ 1702433151000 │ 0.4 │ 0.3 │ +│ "127.0.0.2" │ 1702433141000 │ 0.3 │ 0.1 │ +│ "127.0.0.2" │ 1702433146000 │ 0.2 │ 0.4 │ +│ "127.0.0.2" │ 1702433151000 │ 0.2 │ 0.4 │ +└─────────────┴───────────────┴─────┴────────┘ +"#; + assert_eq!(payload, expected_payload); + } +}