From a94fdf20c074d5fbd40c8beff917e0c9bea18e99 Mon Sep 17 00:00:00 2001 From: paomian Date: Mon, 28 Oct 2024 15:06:35 +0800 Subject: [PATCH 1/8] chore: minor update --- src/servers/src/otlp.rs | 1 + src/servers/src/otlp/logs.rs | 39 +----------------------- src/servers/src/otlp/trace.rs | 12 ++++---- src/servers/src/otlp/trace/attributes.rs | 14 +++++++++ src/servers/src/otlp/trace/span.rs | 15 ++++----- 5 files changed, 28 insertions(+), 53 deletions(-) diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index cc1321ac7020..7810e0ebc169 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -16,3 +16,4 @@ pub mod logs; pub mod metrics; pub mod plugin; pub mod trace; +mod utils; diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index 8f31a1db064b..364292839803 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -29,10 +29,10 @@ use pipeline::{Array, Map, PipelineWay, SchemaInfo, SelectInfo, Value as Pipelin use snafu::{ensure, ResultExt}; use super::trace::attributes::OtlpAnyValue; +use super::utils::{bytes_to_hex_string, key_value_to_jsonb}; use crate::error::{ IncompatibleSchemaSnafu, OpenTelemetryLogSnafu, Result, UnsupportedJsonDataTypeForTagSnafu, }; -use crate::otlp::trace::span::bytes_to_hex_string; /// Convert OpenTelemetry metrics to GreptimeDB insert requests /// @@ -772,43 +772,6 @@ fn key_value_to_map(key_values: Vec) -> BTreeMap JsonbValue<'static> { - match value { - any_value::Value::StringValue(s) => JsonbValue::String(s.into()), - any_value::Value::IntValue(i) => JsonbValue::Number(JsonbNumber::Int64(i)), - any_value::Value::DoubleValue(d) => JsonbValue::Number(JsonbNumber::Float64(d)), - any_value::Value::BoolValue(b) => JsonbValue::Bool(b), - any_value::Value::ArrayValue(a) => { - let values = a - .values - .into_iter() - .map(|v| match v.value { - Some(value) => any_value_to_jsonb(value), - None => JsonbValue::Null, - }) - .collect(); - JsonbValue::Array(values) - } - any_value::Value::KvlistValue(kv) => key_value_to_jsonb(kv.values), - any_value::Value::BytesValue(b) => JsonbValue::String(bytes_to_hex_string(&b).into()), - } -} - -fn key_value_to_jsonb(key_values: Vec) -> JsonbValue<'static> { - let mut map = BTreeMap::new(); - for kv in key_values { - let value = match kv.value { - Some(value) => match value.value { - Some(value) => any_value_to_jsonb(value), - None => JsonbValue::Null, - }, - None => JsonbValue::Null, - }; - map.insert(kv.key.clone(), value); - } - JsonbValue::Object(map) -} - fn log_body_to_string(body: &AnyValue) -> String { let otlp_value = OtlpAnyValue::from(body); otlp_value.to_string() diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index edcdb8fc0b0c..53fd1203f17f 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -43,7 +43,7 @@ pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { for scope_spans in resource_spans.scope_spans { let scope = scope_spans.scope.unwrap_or_default(); for span in scope_spans.spans { - spans.push(parse_span(resource_attrs.clone(), scope.clone(), span)); + spans.push(parse_span(&resource_attrs, &scope, span)); } } } @@ -86,18 +86,18 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> { // fields let str_fields_iter = vec![ - ("resource_attributes", span.resource_attributes.to_string()), + //("resource_attributes", span.resource_attributes.to_string()), ("scope_name", span.scope_name), ("scope_version", span.scope_version), - ("scope_attributes", span.scope_attributes.to_string()), + //("scope_attributes", span.scope_attributes.to_string()), ("trace_state", span.trace_state), ("span_name", span.span_name), ("span_kind", span.span_kind), ("span_status_code", span.span_status_code), ("span_status_message", span.span_status_message), - ("span_attributes", span.span_attributes.to_string()), - ("span_events", span.span_events.to_string()), - ("span_links", span.span_links.to_string()), + //("span_attributes", span.span_attributes.to_string()), + // ("span_events", span.span_events.to_string()), + // ("span_links", span.span_links.to_string()), ] .into_iter() .map(|(col, val)| { diff --git a/src/servers/src/otlp/trace/attributes.rs b/src/servers/src/otlp/trace/attributes.rs index 030c47887f71..0b4ee716038d 100644 --- a/src/servers/src/otlp/trace/attributes.rs +++ b/src/servers/src/otlp/trace/attributes.rs @@ -21,6 +21,8 @@ use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; use serde::ser::{SerializeMap, SerializeSeq}; use serde::Serialize; +use crate::otlp::utils::key_value_to_jsonb; + #[derive(Clone, Debug)] pub struct OtlpAnyValue<'a>(&'a AnyValue); @@ -113,6 +115,18 @@ impl From> for Attributes { } } +impl From<&[KeyValue]> for Attributes { + fn from(attrs: &[KeyValue]) -> Self { + Self(attrs.to_vec()) + } +} + +impl Into> for Attributes { + fn into(self) -> jsonb::Value<'static> { + key_value_to_jsonb(self.0) + } +} + impl Attributes { pub fn get_ref(&self) -> &Vec { &self.0 diff --git a/src/servers/src/otlp/trace/span.rs b/src/servers/src/otlp/trace/span.rs index d3435bad3b4a..2eb83a33ca8d 100644 --- a/src/servers/src/otlp/trace/span.rs +++ b/src/servers/src/otlp/trace/span.rs @@ -24,6 +24,7 @@ use opentelemetry_proto::tonic::trace::v1::{Span, Status}; use serde::Serialize; use super::attributes::Attributes; +use crate::otlp::utils::bytes_to_hex_string; #[derive(Debug, Clone)] pub struct TraceSpan { @@ -142,8 +143,8 @@ impl SpanEvents { } pub fn parse_span( - resource_attrs: Vec, - scope: InstrumentationScope, + resource_attrs: &[KeyValue], + scope: &InstrumentationScope, span: Span, ) -> TraceSpan { let (span_status_code, span_status_message) = status_to_string(&span.status); @@ -156,9 +157,9 @@ pub fn parse_span( resource_attributes: Attributes::from(resource_attrs), trace_state: span.trace_state, - scope_name: scope.name, - scope_version: scope.version, - scope_attributes: Attributes::from(scope.attributes), + scope_name: scope.name.clone(), + scope_version: scope.version.clone(), + scope_attributes: Attributes::from(scope.attributes.clone()), span_name: span.name, span_kind, @@ -175,10 +176,6 @@ pub fn parse_span( } } -pub fn bytes_to_hex_string(bs: &[u8]) -> String { - bs.iter().map(|b| format!("{:02x}", b)).join("") -} - pub fn status_to_string(status: &Option) -> (String, String) { match status { Some(status) => (status.code().as_str_name().into(), status.message.clone()), From f876ef2f0cbc74852ad04a76ce311266d1f4e97c Mon Sep 17 00:00:00 2001 From: paomian Date: Thu, 31 Oct 2024 15:44:05 +0800 Subject: [PATCH 2/8] chore: replace otlp trace attr type from string to jsonb --- src/frontend/src/instance/otlp.rs | 10 +-- src/servers/src/error.rs | 18 +++++- src/servers/src/grpc/otlp.rs | 16 ++++- src/servers/src/http/header.rs | 1 + src/servers/src/http/otlp.rs | 66 ++++++++++++++------ src/servers/src/otlp.rs | 1 - src/servers/src/otlp/logs.rs | 2 + src/servers/src/otlp/plugin.rs | 28 --------- src/servers/src/otlp/trace.rs | 22 ++++++- src/servers/src/otlp/trace/attributes.rs | 6 +- src/servers/src/otlp/trace/span.rs | 50 +++++++++++++++ src/servers/src/query_handler.rs | 1 + src/servers/src/row_writer.rs | 79 ++++++++++++++++++++++-- tests-integration/tests/http.rs | 8 +-- 14 files changed, 234 insertions(+), 74 deletions(-) delete mode 100644 src/servers/src/otlp/plugin.rs diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 0c12658b37a5..f28179d40d59 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -24,7 +24,6 @@ use pipeline::PipelineWay; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; -use servers::otlp::plugin::TraceParserRef; use servers::query_handler::OpenTelemetryProtocolHandler; use session::context::QueryContextRef; use snafu::ResultExt; @@ -64,6 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance { async fn traces( &self, request: ExportTraceServiceRequest, + table_name: String, ctx: QueryContextRef, ) -> ServerResult { self.plugins @@ -77,13 +77,7 @@ impl OpenTelemetryProtocolHandler for Instance { .get::>(); interceptor_ref.pre_execute(ctx.clone())?; - let (table_name, spans) = match self.plugins.get::() { - Some(parser) => (parser.table_name(), parser.parse(request)), - None => ( - otlp::trace::TRACE_TABLE_NAME.to_string(), - otlp::trace::parse(request), - ), - }; + let spans = otlp::trace::parse(request); let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?; diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index e564c584be58..8ec879b9520b 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -527,6 +527,20 @@ pub enum Error { location: Location, }, + #[snafu(display("Missing table name"))] + MissingTableName { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Invalid table name"))] + InvalidTableName { + #[snafu(source)] + error: tonic::metadata::errors::ToStrError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to initialize a watcher for file {}", path))] FileWatch { path: String, @@ -620,7 +634,9 @@ impl ErrorExt for Error { | UnsupportedContentType { .. } | TimestampOverflow { .. } | OpenTelemetryLog { .. } - | UnsupportedJsonDataTypeForTag { .. } => StatusCode::InvalidArguments, + | UnsupportedJsonDataTypeForTag { .. } + | MissingTableName { .. } + | InvalidTableName { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), diff --git a/src/servers/src/grpc/otlp.rs b/src/servers/src/grpc/otlp.rs index 76992e703fef..f3f71900eb14 100644 --- a/src/servers/src/grpc/otlp.rs +++ b/src/servers/src/grpc/otlp.rs @@ -24,10 +24,12 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; use session::context::{Channel, QueryContext}; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response, Status}; use crate::error; +use crate::http::header::constants::GREPTIME_TRACE_TABLE_NAME_HEADER_NAME; +use crate::otlp::trace::TRACE_TABLE_NAME; use crate::query_handler::OpenTelemetryProtocolHandlerRef; pub struct OtlpService { @@ -46,7 +48,15 @@ impl TraceService for OtlpService { &self, request: Request, ) -> StdResult, Status> { - let (_headers, extensions, req) = request.into_parts(); + let (headers, extensions, req) = request.into_parts(); + + let table_name = match headers.get(GREPTIME_TRACE_TABLE_NAME_HEADER_NAME) { + Some(table_name) => table_name + .to_str() + .context(error::InvalidTableNameSnafu)? + .to_string(), + None => TRACE_TABLE_NAME.to_string(), + }; let mut ctx = extensions .get::() @@ -55,7 +65,7 @@ impl TraceService for OtlpService { ctx.set_channel(Channel::Otlp); let ctx = Arc::new(ctx); - let _ = self.handler.traces(req, ctx).await?; + let _ = self.handler.traces(req, table_name, ctx).await?; Ok(Response::new(ExportTraceServiceResponse { partial_success: None, diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index f5b833f6739f..16962a56395a 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -48,6 +48,7 @@ pub mod constants { pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version"; pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name"; pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys"; + pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name"; } pub static GREPTIME_DB_HEADER_FORMAT: HeaderName = diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 6e5a583c0d62..5f7779027805 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -24,6 +24,7 @@ use axum::response::IntoResponse; use axum::{async_trait, Extension}; use bytes::Bytes; use common_telemetry::tracing; +use http::HeaderMap; use opentelemetry_proto::tonic::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, }; @@ -41,11 +42,13 @@ use snafu::prelude::*; use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME; use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; -use crate::error::{self, PipelineSnafu, Result}; +use crate::error::{self, InvalidUtf8ValueSnafu, PipelineSnafu, Result}; use crate::http::header::constants::{ GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, - GREPTIME_LOG_TABLE_NAME_HEADER_NAME, + GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, }; +use crate::otlp::logs::LOG_TABLE_NAME; +use crate::otlp::trace::TRACE_TABLE_NAME; use crate::query_handler::OpenTelemetryProtocolHandlerRef; #[axum_macros::debug_handler] @@ -80,10 +83,16 @@ pub async fn metrics( #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] pub async fn traces( State(handler): State, + header: HeaderMap, Extension(mut query_ctx): Extension, bytes: Bytes, ) -> Result> { let db = query_ctx.get_db_string(); + let table_name = extract_table_name_from_header( + &header, + GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, + TRACE_TABLE_NAME, + )?; query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED @@ -92,7 +101,7 @@ pub async fn traces( let request = ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; handler - .traces(request, query_ctx) + .traces(request, table_name, query_ctx) .await .map(|o| OtlpResponse { resp_body: ExportTraceServiceResponse { @@ -107,17 +116,31 @@ pub struct PipelineInfo { pub pipeline_version: Option, } -fn pipeline_header_error( +fn parse_header_value_to_string(header: &HeaderValue) -> Result { + String::from_utf8(header.as_bytes().to_vec()).context(InvalidUtf8ValueSnafu) +} + +fn parse_pipeline_header_value_to_string( header: &HeaderValue, - key: &str, -) -> StdResult { - let header_utf8 = str::from_utf8(header.as_bytes()); - match header_utf8 { - Ok(s) => Ok(s.to_string()), - Err(_) => Err(( + header_name: &str, +) -> StdResult { + parse_header_value_to_string(header).map_err(|_| { + ( StatusCode::BAD_REQUEST, - format!("`{}` header is not valid UTF-8 string type.", key), - )), + format!("`{}` header is not valid UTF-8 string type.", header_name), + ) + }) +} + +fn extract_table_name_from_header( + headers: &HeaderMap, + header: &str, + default_table_name: &str, +) -> Result { + let table_name = headers.get(header); + match table_name { + Some(name) => parse_header_value_to_string(name), + None => Ok(default_table_name.to_string()), } } @@ -133,11 +156,11 @@ where let pipeline_version = parts.headers.get(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME); match (pipeline_name, pipeline_version) { (Some(name), Some(version)) => Ok(PipelineInfo { - pipeline_name: Some(pipeline_header_error( + pipeline_name: Some(parse_pipeline_header_value_to_string( name, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, )?), - pipeline_version: Some(pipeline_header_error( + pipeline_version: Some(parse_pipeline_header_value_to_string( version, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, )?), @@ -147,7 +170,7 @@ where pipeline_version: None, }), (Some(name), None) => Ok(PipelineInfo { - pipeline_name: Some(pipeline_header_error( + pipeline_name: Some(parse_pipeline_header_value_to_string( name, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, )?), @@ -173,10 +196,13 @@ where match table_name { Some(name) => Ok(TableInfo { - table_name: pipeline_header_error(name, GREPTIME_LOG_TABLE_NAME_HEADER_NAME)?, + table_name: parse_pipeline_header_value_to_string( + name, + GREPTIME_LOG_TABLE_NAME_HEADER_NAME, + )?, }), None => Ok(TableInfo { - table_name: "opentelemetry_logs".to_string(), + table_name: LOG_TABLE_NAME.to_string(), }), } } @@ -196,8 +222,10 @@ where match select { Some(name) => { - let select_header = - pipeline_header_error(name, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?; + let select_header = parse_pipeline_header_value_to_string( + name, + GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, + )?; if select_header.is_empty() { Ok(SelectInfoWrapper(Default::default())) } else { diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index 7810e0ebc169..c55e2337a26c 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -14,6 +14,5 @@ pub mod logs; pub mod metrics; -pub mod plugin; pub mod trace; mod utils; diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index 364292839803..f11cd4ff3c68 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -34,6 +34,8 @@ use crate::error::{ IncompatibleSchemaSnafu, OpenTelemetryLogSnafu, Result, UnsupportedJsonDataTypeForTagSnafu, }; +pub const LOG_TABLE_NAME: &str = "opentelemetry_logs"; + /// Convert OpenTelemetry metrics to GreptimeDB insert requests /// /// See diff --git a/src/servers/src/otlp/plugin.rs b/src/servers/src/otlp/plugin.rs deleted file mode 100644 index 1258fe167ea6..000000000000 --- a/src/servers/src/otlp/plugin.rs +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; - -use super::trace::span::TraceSpans; - -/// Transformer helps to transform ExportTraceServiceRequest based on logic, like: -/// - uplift some fields from Attributes (Map type) to column -pub trait TraceParser: Send + Sync { - fn parse(&self, request: ExportTraceServiceRequest) -> TraceSpans; - fn table_name(&self) -> String; -} - -pub type TraceParserRef = Arc; diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index 53fd1203f17f..8b527bc51056 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -23,7 +23,7 @@ use crate::error::Result; use crate::row_writer::{self, MultiTableData, TableData}; const APPROXIMATE_COLUMN_COUNT: usize = 24; -pub const TRACE_TABLE_NAME: &str = "traces_preview_v01"; +pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces"; pub mod attributes; pub mod span; @@ -120,6 +120,26 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> ValueData::TimestampNanosecondValue(val as i64), ) }); + row_writer::write_json( + writer, + "resource_attributes", + span.resource_attributes.into(), + &mut row, + )?; + row_writer::write_json( + writer, + "scope_attributes", + span.scope_attributes.into(), + &mut row, + )?; + row_writer::write_json( + writer, + "span_attributes", + span.span_attributes.into(), + &mut row, + )?; + row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?; + row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?; row_writer::write_fields(writer, str_fields_iter, &mut row)?; row_writer::write_fields(writer, time_fields_iter, &mut row)?; diff --git a/src/servers/src/otlp/trace/attributes.rs b/src/servers/src/otlp/trace/attributes.rs index 0b4ee716038d..2fce6225eddf 100644 --- a/src/servers/src/otlp/trace/attributes.rs +++ b/src/servers/src/otlp/trace/attributes.rs @@ -121,9 +121,9 @@ impl From<&[KeyValue]> for Attributes { } } -impl Into> for Attributes { - fn into(self) -> jsonb::Value<'static> { - key_value_to_jsonb(self.0) +impl From for jsonb::Value<'static> { + fn from(attrs: Attributes) -> jsonb::Value<'static> { + key_value_to_jsonb(attrs.0) } } diff --git a/src/servers/src/otlp/trace/span.rs b/src/servers/src/otlp/trace/span.rs index 2eb83a33ca8d..21c2085fc92d 100644 --- a/src/servers/src/otlp/trace/span.rs +++ b/src/servers/src/otlp/trace/span.rs @@ -73,6 +73,30 @@ impl From for SpanLink { } } +impl From for jsonb::Value<'static> { + fn from(value: SpanLink) -> jsonb::Value<'static> { + jsonb::Value::Object( + vec![ + ( + "trace_id".to_string(), + jsonb::Value::String(value.trace_id.into()), + ), + ( + "span_id".to_string(), + jsonb::Value::String(value.span_id.into()), + ), + ( + "trace_state".to_string(), + jsonb::Value::String(value.trace_state.into()), + ), + ("attributes".to_string(), value.attributes.into()), + ] + .into_iter() + .collect(), + ) + } +} + #[derive(Debug, Clone, Serialize)] pub struct SpanLinks(Vec); @@ -83,6 +107,12 @@ impl From> for SpanLinks { } } +impl From for jsonb::Value<'static> { + fn from(value: SpanLinks) -> jsonb::Value<'static> { + jsonb::Value::Array(value.0.into_iter().map(Into::into).collect()) + } +} + impl Display for SpanLinks { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", serde_json::to_string(self).unwrap_or_default()) @@ -116,6 +146,20 @@ impl From for SpanEvent { } } +impl From for jsonb::Value<'static> { + fn from(value: SpanEvent) -> jsonb::Value<'static> { + jsonb::Value::Object( + vec![ + ("name".to_string(), jsonb::Value::String(value.name.into())), + ("time".to_string(), jsonb::Value::String(value.time.into())), + ("attributes".to_string(), value.attributes.into()), + ] + .into_iter() + .collect(), + ) + } +} + #[derive(Debug, Clone, Serialize)] pub struct SpanEvents(Vec); @@ -126,6 +170,12 @@ impl From> for SpanEvents { } } +impl From for jsonb::Value<'static> { + fn from(value: SpanEvents) -> jsonb::Value<'static> { + jsonb::Value::Array(value.0.into_iter().map(Into::into).collect()) + } +} + impl Display for SpanEvents { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", serde_json::to_string(self).unwrap_or_default()) diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 618f366eb040..cee866c61b93 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -119,6 +119,7 @@ pub trait OpenTelemetryProtocolHandler: LogHandler { async fn traces( &self, request: ExportTraceServiceRequest, + table_name: String, ctx: QueryContextRef, ) -> Result; diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index 542a2484b481..d13e5ec81566 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -14,10 +14,11 @@ use std::collections::HashMap; +use api::v1::column_data_type_extension::TypeExt; use api::v1::value::ValueData; use api::v1::{ - ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, - Value, + ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row, + RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value, }; use common_grpc::precision::Precision; use common_time::timestamp::TimeUnit; @@ -199,6 +200,68 @@ pub fn write_f64( ) } +fn build_json_column_schema(name: impl ToString) -> ColumnSchema { + ColumnSchema { + column_name: name.to_string(), + datatype: ColumnDataType::Binary as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + ..Default::default() + } +} + +pub fn write_json( + table_data: &mut TableData, + name: impl ToString, + value: jsonb::Value, + one_row: &mut Vec, +) -> Result<()> { + write_by_schema( + table_data, + std::iter::once(( + build_json_column_schema(name), + ValueData::BinaryValue(value.to_vec()), + )), + one_row, + ) +} + +fn write_by_schema( + table_data: &mut TableData, + kv_iter: impl Iterator, + one_row: &mut Vec, +) -> Result<()> { + let TableData { + schema, + column_indexes, + .. + } = table_data; + + for (column_schema, value) in kv_iter { + let index = column_indexes.get(&column_schema.column_name); + if let Some(index) = index { + check_schema_number( + column_schema.datatype, + column_schema.semantic_type, + &schema[*index], + )?; + one_row[*index].value_data = Some(value); + } else { + let index = schema.len(); + let key = column_schema.column_name.clone(); + schema.push(column_schema); + column_indexes.insert(key, index); + one_row.push(Value { + value_data: Some(value), + }); + } + } + + Ok(()) +} + fn write_by_semantic_type( table_data: &mut TableData, semantic_type: SemanticType, @@ -358,23 +421,27 @@ fn check_schema( semantic_type: SemanticType, schema: &ColumnSchema, ) -> Result<()> { + check_schema_number(datatype as i32, semantic_type as i32, schema) +} + +fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> { ensure!( - schema.datatype == datatype as i32, + schema.datatype == datatype, IncompatibleSchemaSnafu { column_name: &schema.column_name, datatype: "datatype", expected: schema.datatype, - actual: datatype as i32, + actual: datatype, } ); ensure!( - schema.semantic_type == semantic_type as i32, + schema.semantic_type == semantic_type, IncompatibleSchemaSnafu { column_name: &schema.column_name, datatype: "semantic_type", expected: schema.semantic_type, - actual: semantic_type as i32, + actual: semantic_type, } ); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f5a1080d768a..6cf6965d3d76 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1594,18 +1594,18 @@ pub async fn test_otlp_traces(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // select traces data - let expected = r#"[["b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","lets-go","SPAN_KIND_CLIENT","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-server\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000],["b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","okey-dokey-0","SPAN_KIND_SERVER","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-client\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000]]"#; + let expected = r#"[["b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","",{"service.name":"telemetrygen"},{},{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","","","lets-go","SPAN_KIND_CLIENT","STATUS_CODE_UNSET","",1726631197820927000,1726631197821050000,0.123,1726631197820927000],["b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b",{"service.name":"telemetrygen"},{},{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","","","okey-dokey-0","SPAN_KIND_SERVER","STATUS_CODE_UNSET","",1726631197820927000,1726631197821050000,0.123,1726631197820927000]]"#; validate_data( "otlp_traces", &client, - "select * from traces_preview_v01;", + "select * from opentelemetry_traces;", expected, ) .await; // drop table let res = client - .get("/v1/sql?sql=drop table traces_preview_v01;") + .get("/v1/sql?sql=drop table opentelemetry_traces;") .send() .await; assert_eq!(res.status(), StatusCode::OK); @@ -1618,7 +1618,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { validate_data( "otlp_traces_with_gzip", &client, - "select * from traces_preview_v01;", + "select * from opentelemetry_traces;", expected, ) .await; From 0becbe32b74b418e6d012d826eb9e5b3bc22fdd6 Mon Sep 17 00:00:00 2001 From: paomian Date: Thu, 31 Oct 2024 15:56:26 +0800 Subject: [PATCH 3/8] chore: add new util file and remove useless code --- src/servers/src/otlp/trace.rs | 5 ---- src/servers/src/otlp/utils.rs | 46 +++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) create mode 100644 src/servers/src/otlp/utils.rs diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index 8b527bc51056..1fcf1565c6a1 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -86,18 +86,13 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> { // fields let str_fields_iter = vec![ - //("resource_attributes", span.resource_attributes.to_string()), ("scope_name", span.scope_name), ("scope_version", span.scope_version), - //("scope_attributes", span.scope_attributes.to_string()), ("trace_state", span.trace_state), ("span_name", span.span_name), ("span_kind", span.span_kind), ("span_status_code", span.span_status_code), ("span_status_message", span.span_status_message), - //("span_attributes", span.span_attributes.to_string()), - // ("span_events", span.span_events.to_string()), - // ("span_links", span.span_links.to_string()), ] .into_iter() .map(|(col, val)| { diff --git a/src/servers/src/otlp/utils.rs b/src/servers/src/otlp/utils.rs new file mode 100644 index 000000000000..c2fcefc539a3 --- /dev/null +++ b/src/servers/src/otlp/utils.rs @@ -0,0 +1,46 @@ +use std::collections::BTreeMap; + +use itertools::Itertools; +use jsonb::{Number as JsonbNumber, Value as JsonbValue}; +use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue}; + +pub fn bytes_to_hex_string(bs: &[u8]) -> String { + bs.iter().map(|b| format!("{:02x}", b)).join("") +} + +pub fn any_value_to_jsonb(value: any_value::Value) -> JsonbValue<'static> { + match value { + any_value::Value::StringValue(s) => JsonbValue::String(s.into()), + any_value::Value::IntValue(i) => JsonbValue::Number(JsonbNumber::Int64(i)), + any_value::Value::DoubleValue(d) => JsonbValue::Number(JsonbNumber::Float64(d)), + any_value::Value::BoolValue(b) => JsonbValue::Bool(b), + any_value::Value::ArrayValue(a) => { + let values = a + .values + .into_iter() + .map(|v| match v.value { + Some(value) => any_value_to_jsonb(value), + None => JsonbValue::Null, + }) + .collect(); + JsonbValue::Array(values) + } + any_value::Value::KvlistValue(kv) => key_value_to_jsonb(kv.values), + any_value::Value::BytesValue(b) => JsonbValue::String(bytes_to_hex_string(&b).into()), + } +} + +pub fn key_value_to_jsonb(key_values: Vec) -> JsonbValue<'static> { + let mut map = BTreeMap::new(); + for kv in key_values { + let value = match kv.value { + Some(value) => match value.value { + Some(value) => any_value_to_jsonb(value), + None => JsonbValue::Null, + }, + None => JsonbValue::Null, + }; + map.insert(kv.key.clone(), value); + } + JsonbValue::Object(map) +} From c607d3b2c57a3c0bd927cc4c8d38e2f52ee51b49 Mon Sep 17 00:00:00 2001 From: paomian Date: Thu, 31 Oct 2024 16:27:07 +0800 Subject: [PATCH 4/8] chore: add license header --- src/servers/src/otlp/utils.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/servers/src/otlp/utils.rs b/src/servers/src/otlp/utils.rs index c2fcefc539a3..7139251a14f8 100644 --- a/src/servers/src/otlp/utils.rs +++ b/src/servers/src/otlp/utils.rs @@ -1,3 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::collections::BTreeMap; use itertools::Itertools; From a2489d5c11ecfed79d488e9f702cabacfdc772f9 Mon Sep 17 00:00:00 2001 From: paomian Date: Thu, 31 Oct 2024 16:29:09 +0800 Subject: [PATCH 5/8] chore: remove unused error --- src/servers/src/error.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 8ec879b9520b..1abfadeddbec 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -527,12 +527,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Missing table name"))] - MissingTableName { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid table name"))] InvalidTableName { #[snafu(source)] @@ -635,7 +629,6 @@ impl ErrorExt for Error { | TimestampOverflow { .. } | OpenTelemetryLog { .. } | UnsupportedJsonDataTypeForTag { .. } - | MissingTableName { .. } | InvalidTableName { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), From c9c5534214db07df12da98f33af578e47c4770e3 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Wed, 6 Nov 2024 16:49:27 +0800 Subject: [PATCH 6/8] chore: adjust otlp traces column order --- src/servers/src/otlp/trace.rs | 143 ++++++++++++++--------------- src/servers/src/otlp/trace/span.rs | 6 -- src/servers/src/otlp/utils.rs | 19 ++++ 3 files changed, 86 insertions(+), 82 deletions(-) diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index 1fcf1565c6a1..9572ea4df142 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -15,11 +15,11 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; use common_grpc::precision::Precision; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use self::span::{parse_span, TraceSpan, TraceSpans}; use crate::error::Result; +use crate::otlp::utils::{make_column_data, make_string_column_data}; use crate::row_writer::{self, MultiTableData, TableData}; const APPROXIMATE_COLUMN_COUNT: usize = 24; @@ -72,88 +72,79 @@ pub fn to_grpc_insert_requests( pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> { let mut row = writer.alloc_one_row(); - { - // tags - let iter = vec![ - ("trace_id", span.trace_id), - ("span_id", span.span_id), - ("parent_span_id", span.parent_span_id), - ] - .into_iter() - .map(|(col, val)| (col.to_string(), val)); - row_writer::write_tags(writer, iter, &mut row)?; - } - { - // fields - let str_fields_iter = vec![ - ("scope_name", span.scope_name), - ("scope_version", span.scope_version), - ("trace_state", span.trace_state), - ("span_name", span.span_name), - ("span_kind", span.span_kind), - ("span_status_code", span.span_status_code), - ("span_status_message", span.span_status_message), - ] - .into_iter() - .map(|(col, val)| { - ( - col.into(), - ColumnDataType::String, - ValueData::StringValue(val), - ) - }); - - let time_fields_iter = vec![ - ("start", span.start_in_nanosecond), - ("end", span.end_in_nanosecond), - ] - .into_iter() - .map(|(col, val)| { - ( - col.into(), - ColumnDataType::TimestampNanosecond, - ValueData::TimestampNanosecondValue(val as i64), - ) - }); - row_writer::write_json( - writer, - "resource_attributes", - span.resource_attributes.into(), - &mut row, - )?; - row_writer::write_json( - writer, - "scope_attributes", - span.scope_attributes.into(), - &mut row, - )?; - row_writer::write_json( - writer, - "span_attributes", - span.span_attributes.into(), - &mut row, - )?; - row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?; - row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?; - - row_writer::write_fields(writer, str_fields_iter, &mut row)?; - row_writer::write_fields(writer, time_fields_iter, &mut row)?; - row_writer::write_fields(writer, span.uplifted_span_attributes.into_iter(), &mut row)?; - } - row_writer::write_f64( - writer, - GREPTIME_VALUE, - (span.end_in_nanosecond - span.start_in_nanosecond) as f64 / 1_000_000.0, // duration in millisecond - &mut row, - )?; + // write ts row_writer::write_ts_to_nanos( writer, - GREPTIME_TIMESTAMP, + "timestamp", Some(span.start_in_nanosecond as i64), Precision::Nanosecond, &mut row, )?; + // write ts fields + let fields = vec![ + make_column_data( + "timestamp_end", + ColumnDataType::TimestampNanosecond, + ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64), + ), + make_column_data( + "duration_nano", + ColumnDataType::Uint64, + ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond), + ), + ]; + row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + + // tags + let iter = vec![ + ("trace_id", span.trace_id), + ("span_id", span.span_id), + ("parent_span_id", span.parent_span_id), + ] + .into_iter() + .map(|(col, val)| (col.to_string(), val)); + row_writer::write_tags(writer, iter, &mut row)?; + + // write fields + let fields = vec![ + make_string_column_data("span_kind", span.span_kind), + make_string_column_data("span_name", span.span_name), + make_string_column_data("span_status_code", span.span_status_code), + make_string_column_data("span_status_message", span.span_status_message), + make_string_column_data("trace_state", span.trace_state), + ]; + row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + + row_writer::write_json( + writer, + "span_attributes", + span.span_attributes.into(), + &mut row, + )?; + row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?; + row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?; + + // write fields + let fields = vec![ + make_string_column_data("scope_name", span.scope_name), + make_string_column_data("scope_version", span.scope_version), + ]; + row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + + row_writer::write_json( + writer, + "scope_attributes", + span.scope_attributes.into(), + &mut row, + )?; + + row_writer::write_json( + writer, + "resource_attributes", + span.resource_attributes.into(), + &mut row, + )?; writer.add_row(row); diff --git a/src/servers/src/otlp/trace/span.rs b/src/servers/src/otlp/trace/span.rs index 21c2085fc92d..02fc523f66c7 100644 --- a/src/servers/src/otlp/trace/span.rs +++ b/src/servers/src/otlp/trace/span.rs @@ -14,8 +14,6 @@ use std::fmt::Display; -use api::v1::value::ValueData; -use api::v1::ColumnDataType; use common_time::timestamp::Timestamp; use itertools::Itertools; use opentelemetry_proto::tonic::common::v1::{InstrumentationScope, KeyValue}; @@ -48,8 +46,6 @@ pub struct TraceSpan { pub span_links: SpanLinks, // TODO(yuanbohan): List in the future pub start_in_nanosecond: u64, // this is also the Timestamp Index pub end_in_nanosecond: u64, - - pub uplifted_span_attributes: Vec<(String, ColumnDataType, ValueData)>, } pub type TraceSpans = Vec; @@ -221,8 +217,6 @@ pub fn parse_span( start_in_nanosecond: span.start_time_unix_nano, end_in_nanosecond: span.end_time_unix_nano, - - uplifted_span_attributes: vec![], } } diff --git a/src/servers/src/otlp/utils.rs b/src/servers/src/otlp/utils.rs index 7139251a14f8..be3741666df1 100644 --- a/src/servers/src/otlp/utils.rs +++ b/src/servers/src/otlp/utils.rs @@ -14,6 +14,8 @@ use std::collections::BTreeMap; +use api::v1::value::ValueData; +use api::v1::ColumnDataType; use itertools::Itertools; use jsonb::{Number as JsonbNumber, Value as JsonbValue}; use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue}; @@ -58,3 +60,20 @@ pub fn key_value_to_jsonb(key_values: Vec) -> JsonbValue<'static> { } JsonbValue::Object(map) } + +#[inline] +pub(crate) fn make_string_column_data( + name: &str, + value: String, +) -> (String, ColumnDataType, ValueData) { + make_column_data(name, ColumnDataType::String, ValueData::StringValue(value)) +} + +#[inline] +pub(crate) fn make_column_data( + name: &str, + data_type: ColumnDataType, + value: ValueData, +) -> (String, ColumnDataType, ValueData) { + (name.to_string(), data_type, value) +} From 8c59e7c9605eae6fb3bf571bcb9b6d769fe6fc82 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Wed, 6 Nov 2024 17:17:04 +0800 Subject: [PATCH 7/8] chore: update test --- tests-integration/tests/http.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 6cf6965d3d76..a0cc3b2f5d0c 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1594,7 +1594,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // select traces data - let expected = r#"[["b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","",{"service.name":"telemetrygen"},{},{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","","","lets-go","SPAN_KIND_CLIENT","STATUS_CODE_UNSET","",1726631197820927000,1726631197821050000,0.123,1726631197820927000],["b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b",{"service.name":"telemetrygen"},{},{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","","","okey-dokey-0","SPAN_KIND_SERVER","STATUS_CODE_UNSET","",1726631197820927000,1726631197821050000,0.123,1726631197820927000]]"#; + let expected = r#"[[1726631197820927000,1726631197821050000,123000,"b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1726631197820927000,1726631197821050000,123000,"b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#; validate_data( "otlp_traces", &client, From 6e9d75f840f2d6b34234498268d236b1d7b94316 Mon Sep 17 00:00:00 2001 From: paomian Date: Fri, 8 Nov 2024 12:41:52 +0800 Subject: [PATCH 8/8] chore: minor fix --- src/servers/src/http/otlp.rs | 105 +++++++++++++++++------------------ 1 file changed, 52 insertions(+), 53 deletions(-) diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 5f7779027805..5059afd9722e 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -88,11 +88,13 @@ pub async fn traces( bytes: Bytes, ) -> Result> { let db = query_ctx.get_db_string(); - let table_name = extract_table_name_from_header( + let table_name = extract_string_value_from_header( &header, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, - TRACE_TABLE_NAME, - )?; + Some(TRACE_TABLE_NAME), + )? + // safety here, we provide default value for table_name + .unwrap(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED @@ -120,27 +122,27 @@ fn parse_header_value_to_string(header: &HeaderValue) -> Result { String::from_utf8(header.as_bytes().to_vec()).context(InvalidUtf8ValueSnafu) } -fn parse_pipeline_header_value_to_string( - header: &HeaderValue, - header_name: &str, -) -> StdResult { - parse_header_value_to_string(header).map_err(|_| { - ( - StatusCode::BAD_REQUEST, - format!("`{}` header is not valid UTF-8 string type.", header_name), - ) - }) -} - -fn extract_table_name_from_header( +fn extract_string_value_from_header( headers: &HeaderMap, header: &str, - default_table_name: &str, -) -> Result { + default_table_name: Option<&str>, +) -> Result> { let table_name = headers.get(header); match table_name { - Some(name) => parse_header_value_to_string(name), - None => Ok(default_table_name.to_string()), + Some(name) => parse_header_value_to_string(name).map(Some), + None => match default_table_name { + Some(name) => Ok(Some(name.to_string())), + None => Ok(None), + }, + } +} + +fn utf8_error(header_name: &str) -> impl Fn(error::Error) -> (StatusCode, String) + use<'_> { + move |_| { + ( + StatusCode::BAD_REQUEST, + format!("`{}` header is not valid UTF-8 string type.", header_name), + ) } } @@ -152,28 +154,27 @@ where type Rejection = (StatusCode, String); async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let pipeline_name = parts.headers.get(GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME); - let pipeline_version = parts.headers.get(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME); + let headers = &parts.headers; + let pipeline_name = + extract_string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, None) + .map_err(utf8_error(GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME))?; + let pipeline_version = extract_string_value_from_header( + headers, + GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, + None, + ) + .map_err(utf8_error(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME))?; match (pipeline_name, pipeline_version) { (Some(name), Some(version)) => Ok(PipelineInfo { - pipeline_name: Some(parse_pipeline_header_value_to_string( - name, - GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, - )?), - pipeline_version: Some(parse_pipeline_header_value_to_string( - version, - GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, - )?), + pipeline_name: Some(name), + pipeline_version: Some(version), }), (None, _) => Ok(PipelineInfo { pipeline_name: None, pipeline_version: None, }), (Some(name), None) => Ok(PipelineInfo { - pipeline_name: Some(parse_pipeline_header_value_to_string( - name, - GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, - )?), + pipeline_name: Some(name), pipeline_version: None, }), } @@ -192,19 +193,16 @@ where type Rejection = (StatusCode, String); async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let table_name = parts.headers.get(GREPTIME_LOG_TABLE_NAME_HEADER_NAME); + let table_name = extract_string_value_from_header( + &parts.headers, + GREPTIME_LOG_TABLE_NAME_HEADER_NAME, + Some(LOG_TABLE_NAME), + ) + .map_err(utf8_error(GREPTIME_LOG_TABLE_NAME_HEADER_NAME))? + // safety here, we provide default value for table_name + .unwrap(); - match table_name { - Some(name) => Ok(TableInfo { - table_name: parse_pipeline_header_value_to_string( - name, - GREPTIME_LOG_TABLE_NAME_HEADER_NAME, - )?, - }), - None => Ok(TableInfo { - table_name: LOG_TABLE_NAME.to_string(), - }), - } + Ok(TableInfo { table_name }) } } @@ -218,18 +216,19 @@ where type Rejection = (StatusCode, String); async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let select = parts.headers.get(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME); + let select = extract_string_value_from_header( + &parts.headers, + GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, + None, + ) + .map_err(utf8_error(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME))?; match select { Some(name) => { - let select_header = parse_pipeline_header_value_to_string( - name, - GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, - )?; - if select_header.is_empty() { + if name.is_empty() { Ok(SelectInfoWrapper(Default::default())) } else { - Ok(SelectInfoWrapper(SelectInfo::from(select_header))) + Ok(SelectInfoWrapper(SelectInfo::from(name))) } } None => Ok(SelectInfoWrapper(Default::default())),