From 21b9f2afa3364c656c2b813b7c5e8ace25aa7d6e Mon Sep 17 00:00:00 2001 From: jmjoy Date: Fri, 30 Dec 2022 17:53:06 +0800 Subject: [PATCH 1/2] Add authentication and custom intercept support. --- src/reporter/grpc.rs | 124 +++++++++++++++++++++------------ src/skywalking_proto/v3/mod.rs | 1 + 2 files changed, 79 insertions(+), 46 deletions(-) diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index 2ac1f00..5ea63c8 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -48,7 +48,10 @@ use tokio::{ }; use tonic::{ async_trait, + metadata::{Ascii, MetadataValue}, + service::{interceptor::InterceptedService, Interceptor}, transport::{self, Channel, Endpoint}, + Request, Status, }; /// Special purpose, used for user-defined production operations. Generally, it @@ -111,13 +114,29 @@ impl CollectItemConsume for mpsc::UnboundedReceiver { } } +#[derive(Default, Clone)] +struct CustomInterceptor { + authentication: Option>, + custom_intercept: Option) -> Result, Status> + Send + Sync>>, +} + +impl Interceptor for CustomInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result, Status> { + if let Some(authentication) = &self.authentication { + if let Ok(authentication) = authentication.parse::>() { + request + .metadata_mut() + .insert("authentication", authentication); + } + } + if let Some(custom_intercept) = &self.custom_intercept { + request = custom_intercept(request)?; + } + Ok(request) + } +} + struct Inner { - trace_client: Mutex>, - log_client: Mutex>, - meter_client: Mutex>, - #[cfg(feature = "management")] - #[cfg_attr(docsrs, doc(cfg(feature = "management")))] - management_client: Mutex>, producer: P, consumer: Mutex>, is_reporting: AtomicBool, @@ -131,6 +150,8 @@ pub type DynErrHandle = dyn Fn(Box) + Send + Sync + 'static; pub struct GrpcReporter { inner: Arc>, err_handle: Arc>>, + channel: Channel, + interceptor: CustomInterceptor, } impl GrpcReporter, mpsc::UnboundedReceiver> { @@ -156,17 +177,14 @@ impl GrpcReporter { pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self { Self { inner: Arc::new(Inner { - trace_client: Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())), - log_client: Mutex::new(LogReportServiceClient::new(channel.clone())), - #[cfg(feature = "management")] - management_client: Mutex::new(ManagementServiceClient::new(channel.clone())), - meter_client: Mutex::new(MeterReportServiceClient::new(channel)), producer, consumer: Mutex::new(Some(consumer)), is_reporting: Default::default(), is_closed: Default::default(), }), err_handle: Default::default(), + channel, + interceptor: Default::default(), } } @@ -179,6 +197,22 @@ impl GrpcReporter { self } + /// Set the authentication header value. By default, the authentication is + /// not set. + pub fn with_authentication(mut self, authentication: impl Into) -> Self { + self.interceptor.authentication = Some(Arc::new(authentication.into())); + self + } + + /// Set the custom intercept. By default, the custom intercept is not set. + pub fn with_custom_intercept( + mut self, + custom_intercept: impl Fn(Request<()>) -> Result, Status> + Send + Sync + 'static, + ) -> Self { + self.interceptor.custom_intercept = Some(Arc::new(custom_intercept)); + self + } + /// Start to reporting. /// /// # Panics @@ -193,9 +227,28 @@ impl GrpcReporter { rb: ReporterAndBuffer { inner: Arc::clone(&self.inner), status_handle: None, + trace_buffer: Default::default(), log_buffer: Default::default(), meter_buffer: Default::default(), + + trace_client: TraceSegmentReportServiceClient::with_interceptor( + self.channel.clone(), + self.interceptor.clone(), + ), + log_client: LogReportServiceClient::with_interceptor( + self.channel.clone(), + self.interceptor.clone(), + ), + meter_client: MeterReportServiceClient::with_interceptor( + self.channel.clone(), + self.interceptor.clone(), + ), + #[cfg(feature = "management")] + management_client: ManagementServiceClient::with_interceptor( + self.channel.clone(), + self.interceptor.clone(), + ), }, shutdown_signal: Box::pin(pending()), consumer: self.inner.consumer.lock().await.take().unwrap(), @@ -208,6 +261,8 @@ impl Clone for GrpcReporter { Self { inner: self.inner.clone(), err_handle: self.err_handle.clone(), + channel: self.channel.clone(), + interceptor: self.interceptor.clone(), } } } @@ -227,9 +282,17 @@ impl Report for GrpcReporter struct ReporterAndBuffer { inner: Arc>, status_handle: Option>, + trace_buffer: LinkedList, log_buffer: LinkedList, meter_buffer: LinkedList, + + trace_client: TraceSegmentReportServiceClient>, + log_client: LogReportServiceClient>, + meter_client: MeterReportServiceClient>, + #[cfg(feature = "management")] + #[cfg_attr(docsrs, doc(cfg(feature = "management")))] + management_client: ManagementServiceClient>, } impl ReporterAndBuffer { @@ -248,10 +311,7 @@ impl ReporterAndBuffer { #[cfg(feature = "management")] CollectItem::Instance(item) => { if let Err(e) = self - .inner .management_client - .lock() - .await .report_instance_properties(*item) .await { @@ -262,14 +322,7 @@ impl ReporterAndBuffer { } #[cfg(feature = "management")] CollectItem::Ping(item) => { - if let Err(e) = self - .inner - .management_client - .lock() - .await - .keep_alive(*item) - .await - { + if let Err(e) = self.management_client.keep_alive(*item).await { if let Some(status_handle) = &self.status_handle { status_handle(e); } @@ -279,14 +332,7 @@ impl ReporterAndBuffer { if !self.trace_buffer.is_empty() { let buffer = take(&mut self.trace_buffer); - if let Err(e) = self - .inner - .trace_client - .lock() - .await - .collect(stream::iter(buffer)) - .await - { + if let Err(e) = self.trace_client.collect(stream::iter(buffer)).await { if let Some(status_handle) = &self.status_handle { status_handle(e); } @@ -294,14 +340,7 @@ impl ReporterAndBuffer { } if !self.log_buffer.is_empty() { let buffer = take(&mut self.log_buffer); - if let Err(e) = self - .inner - .log_client - .lock() - .await - .collect(stream::iter(buffer)) - .await - { + if let Err(e) = self.log_client.collect(stream::iter(buffer)).await { if let Some(status_handle) = &self.status_handle { status_handle(e); } @@ -310,14 +349,7 @@ impl ReporterAndBuffer { if !self.meter_buffer.is_empty() { let buffer = take(&mut self.meter_buffer); - if let Err(e) = self - .inner - .meter_client - .lock() - .await - .collect(stream::iter(buffer)) - .await - { + if let Err(e) = self.meter_client.collect(stream::iter(buffer)).await { if let Some(status_handle) = &self.status_handle { status_handle(e); } diff --git a/src/skywalking_proto/v3/mod.rs b/src/skywalking_proto/v3/mod.rs index 32d517d..0086949 100644 --- a/src/skywalking_proto/v3/mod.rs +++ b/src/skywalking_proto/v3/mod.rs @@ -17,6 +17,7 @@ //! Generated code of `skywalking.v3`, by `tonic`. #![allow(missing_docs)] +#![allow(rustdoc::invalid_html_tags)] use crate::common::system_time::{fetch_time, TimePeriod}; From b0b66023cc7aac9b2c354e0b5a4cea5024514abf Mon Sep 17 00:00:00 2001 From: jmjoy Date: Tue, 3 Jan 2023 11:29:39 +0800 Subject: [PATCH 2/2] Update documnet and fix clippy. --- README.md | 2 ++ src/reporter/grpc.rs | 4 +++- src/reporter/print.rs | 1 + src/trace/trace_context.rs | 8 ++++---- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index af00725..3ebf868 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,8 @@ async fn handle_metric(mut metricer: Metricer) { async fn main() -> Result<(), Box> { // Connect to skywalking oap server. let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?; + // Optional authentication, based on backend setting. + let reporter = reporter.with_authentication(""); // Spawn the reporting in background, with listening the graceful shutdown signal. let handle = reporter diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index 5ea63c8..68307ee 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -114,10 +114,12 @@ impl CollectItemConsume for mpsc::UnboundedReceiver { } } +type DynInterceptHandler = dyn Fn(Request<()>) -> Result, Status> + Send + Sync; + #[derive(Default, Clone)] struct CustomInterceptor { authentication: Option>, - custom_intercept: Option) -> Result, Status> + Send + Sync>>, + custom_intercept: Option>, } impl Interceptor for CustomInterceptor { diff --git a/src/reporter/print.rs b/src/reporter/print.rs index 5b3fde0..006327e 100644 --- a/src/reporter/print.rs +++ b/src/reporter/print.rs @@ -40,6 +40,7 @@ impl PrintReporter { } impl Report for PrintReporter { + #[allow(clippy::print_stdout)] fn report(&self, items: CollectItem) { match items { CollectItem::Trace(data) => { diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs index b5161f4..e9612c2 100644 --- a/src/trace/trace_context.rs +++ b/src/trace/trace_context.rs @@ -52,15 +52,15 @@ impl SpanStack { } pub(crate) fn with_finalized_mut(&self, f: impl FnOnce(&mut Vec) -> T) -> T { - f(&mut *self.finalized.try_write().expect(LOCK_MSG)) + f(&mut self.finalized.try_write().expect(LOCK_MSG)) } pub(crate) fn with_active(&self, f: impl FnOnce(&Vec) -> T) -> T { - f(&*self.active.try_read().expect(LOCK_MSG)) + f(&self.active.try_read().expect(LOCK_MSG)) } pub(crate) fn with_active_mut(&self, f: impl FnOnce(&mut Vec) -> T) -> T { - f(&mut *self.active.try_write().expect(LOCK_MSG)) + f(&mut self.active.try_write().expect(LOCK_MSG)) } fn pop_active(&self, index: usize) -> Option { @@ -184,7 +184,7 @@ impl TracingContext { } fn with_spans_mut(&mut self, f: impl FnOnce(&mut Vec) -> T) -> T { - f(&mut *self.span_stack.finalized.try_write().expect(LOCK_MSG)) + f(&mut self.span_stack.finalized.try_write().expect(LOCK_MSG)) } pub(crate) fn with_active_span_stack(&self, f: impl FnOnce(&Vec) -> T) -> T {