Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add functionality to the Opentelemetry write interface to extract fields from attr to top-level data. #4859

Merged
merged 10 commits into from
Oct 24, 2024
1 change: 0 additions & 1 deletion src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;

self.handle_log_inserts(requests, ctx)
.await
.inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))
Expand Down
27 changes: 26 additions & 1 deletion src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,33 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
.context(IntermediateKeyIndexSnafu { kind, key })
}

/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
#[derive(Default)]
pub struct SelectInfo {
pub keys: Vec<String>,
}

/// Try to convert a string to SelectInfo
/// The string should be a comma-separated list of keys
/// example: "key1,key2,key3"
/// The keys will be sorted and deduplicated
impl From<String> for SelectInfo {
fn from(value: String) -> Self {
let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
keys.dedup();

SelectInfo { keys }
}
}

impl SelectInfo {
pub fn is_empty(&self) -> bool {
self.keys.is_empty()
}
}

pub enum PipelineWay {
Identity,
OtlpLog(Box<SelectInfo>),
Custom(std::sync::Arc<Pipeline<crate::GreptimeTransformer>>),
}

Expand Down
6 changes: 3 additions & 3 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ impl Transformer for GreptimeTransformer {
/// As you traverse the user input JSON, this will change.
/// It will record a superset of all user input schemas.
#[derive(Debug, Default)]
struct SchemaInfo {
pub struct SchemaInfo {
/// schema info
schema: Vec<ColumnSchema>,
pub schema: Vec<ColumnSchema>,
/// index of the column name
index: HashMap<String, usize>,
pub index: HashMap<String, usize>,
}

fn resolve_schema(
Expand Down
3 changes: 2 additions & 1 deletion src/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ mod metrics;

pub use etl::error::Result;
pub use etl::processor::Processor;
pub use etl::transform::transformer::greptime::SchemaInfo;
pub use etl::transform::transformer::identity_pipeline;
pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{error as etl_error, parse, Content, Pipeline, PipelineWay};
pub use etl::{error as etl_error, parse, Content, Pipeline, PipelineWay, SelectInfo};
pub use manager::{
error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef,
PipelineVersion,
Expand Down
10 changes: 9 additions & 1 deletion src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported json data type for tag: {} {}", key, ty))]
UnsupportedJsonDataTypeForTag {
key: String,
ty: String,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -603,7 +610,8 @@ impl ErrorExt for Error {
| ParseJson { .. }
| UnsupportedContentType { .. }
| TimestampOverflow { .. }
| OpenTelemetryLog { .. } => StatusCode::InvalidArguments,
| OpenTelemetryLog { .. }
| UnsupportedJsonDataTypeForTag { .. } => StatusCode::InvalidArguments,

Catalog { source, .. } => source.status_code(),
RowWriter { source, .. } => source.status_code(),
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/http/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod constants {
pub const GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-log-pipeline-name";
pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version";
pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name";
pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys";
}

pub static GREPTIME_DB_HEADER_FORMAT: HeaderName =
Expand Down
35 changes: 32 additions & 3 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use pipeline::util::to_pipeline_version;
use pipeline::PipelineWay;
use pipeline::{PipelineWay, SelectInfo};
use prost::Message;
use session::context::{Channel, QueryContext};
use snafu::prelude::*;

use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME;
use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::error::{self, Result};
use crate::http::header::constants::{
Expand Down Expand Up @@ -181,13 +182,41 @@ where
}
}

pub struct SelectInfoWrapper(SelectInfo);

#[async_trait]
impl<S> FromRequestParts<S> for SelectInfoWrapper
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let select = parts.headers.get(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME);

match select {
Some(name) => {
let select_header =
pipeline_header_error(name, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?;
if select_header.is_empty() {
Ok(SelectInfoWrapper(Default::default()))
} else {
Ok(SelectInfoWrapper(SelectInfo::from(select_header)))
}
}
None => Ok(SelectInfoWrapper(Default::default())),
}
}
}

#[axum_macros::debug_handler]
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))]
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "logs"))]
pub async fn logs(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(mut query_ctx): Extension<QueryContext>,
pipeline_info: PipelineInfo,
table_info: TableInfo,
SelectInfoWrapper(select_info): SelectInfoWrapper,
bytes: Bytes,
) -> Result<OtlpResponse<ExportLogsServiceResponse>> {
let db = query_ctx.get_db_string();
Expand Down Expand Up @@ -218,7 +247,7 @@ pub async fn logs(
};
pipeline_way = PipelineWay::Custom(pipeline);
} else {
pipeline_way = PipelineWay::Identity;
pipeline_way = PipelineWay::OtlpLog(Box::new(select_info));
}

handler
Expand Down
Loading