Skip to content

Commit

Permalink
Add authentication and custom intercept support. (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmjoy authored Jan 3, 2023
1 parent 4c61a5b commit cbf1079
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 50 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ async fn handle_metric(mut metricer: Metricer) {
async fn main() -> Result<(), Box<dyn Error>> {
// Connect to skywalking oap server.
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
// Optional authentication, based on backend setting.
let reporter = reporter.with_authentication("<TOKEN>");
// Spawn the reporting in background, with listening the graceful shutdown signal.
let handle = reporter
Expand Down
126 changes: 80 additions & 46 deletions src/reporter/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ use tokio::{
};
use tonic::{
async_trait,
metadata::{Ascii, MetadataValue},
service::{interceptor::InterceptedService, Interceptor},
transport::{self, Channel, Endpoint},
Request, Status,
};

/// Special purpose, used for user-defined production operations. Generally, it
Expand Down Expand Up @@ -111,13 +114,31 @@ impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
}
}

type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;

#[derive(Default, Clone)]
struct CustomInterceptor {
authentication: Option<Arc<String>>,
custom_intercept: Option<Arc<DynInterceptHandler>>,
}

impl Interceptor for CustomInterceptor {
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
if let Some(authentication) = &self.authentication {
if let Ok(authentication) = authentication.parse::<MetadataValue<Ascii>>() {
request
.metadata_mut()
.insert("authentication", authentication);
}
}
if let Some(custom_intercept) = &self.custom_intercept {
request = custom_intercept(request)?;
}
Ok(request)
}
}

struct Inner<P, C> {
trace_client: Mutex<TraceSegmentReportServiceClient<Channel>>,
log_client: Mutex<LogReportServiceClient<Channel>>,
meter_client: Mutex<MeterReportServiceClient<Channel>>,
#[cfg(feature = "management")]
#[cfg_attr(docsrs, doc(cfg(feature = "management")))]
management_client: Mutex<ManagementServiceClient<Channel>>,
producer: P,
consumer: Mutex<Option<C>>,
is_reporting: AtomicBool,
Expand All @@ -131,6 +152,8 @@ pub type DynErrHandle = dyn Fn(Box<dyn Error>) + Send + Sync + 'static;
pub struct GrpcReporter<P, C> {
inner: Arc<Inner<P, C>>,
err_handle: Arc<Option<Box<DynErrHandle>>>,
channel: Channel,
interceptor: CustomInterceptor,
}

impl GrpcReporter<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
Expand All @@ -156,17 +179,14 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self {
Self {
inner: Arc::new(Inner {
trace_client: Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())),
log_client: Mutex::new(LogReportServiceClient::new(channel.clone())),
#[cfg(feature = "management")]
management_client: Mutex::new(ManagementServiceClient::new(channel.clone())),
meter_client: Mutex::new(MeterReportServiceClient::new(channel)),
producer,
consumer: Mutex::new(Some(consumer)),
is_reporting: Default::default(),
is_closed: Default::default(),
}),
err_handle: Default::default(),
channel,
interceptor: Default::default(),
}
}

Expand All @@ -179,6 +199,22 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
self
}

/// Set the authentication header value. By default, the authentication is
/// not set.
pub fn with_authentication(mut self, authentication: impl Into<String>) -> Self {
self.interceptor.authentication = Some(Arc::new(authentication.into()));
self
}

/// Set the custom intercept. By default, the custom intercept is not set.
pub fn with_custom_intercept(
mut self,
custom_intercept: impl Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static,
) -> Self {
self.interceptor.custom_intercept = Some(Arc::new(custom_intercept));
self
}

/// Start to reporting.
///
/// # Panics
Expand All @@ -193,9 +229,28 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
rb: ReporterAndBuffer {
inner: Arc::clone(&self.inner),
status_handle: None,

trace_buffer: Default::default(),
log_buffer: Default::default(),
meter_buffer: Default::default(),

trace_client: TraceSegmentReportServiceClient::with_interceptor(
self.channel.clone(),
self.interceptor.clone(),
),
log_client: LogReportServiceClient::with_interceptor(
self.channel.clone(),
self.interceptor.clone(),
),
meter_client: MeterReportServiceClient::with_interceptor(
self.channel.clone(),
self.interceptor.clone(),
),
#[cfg(feature = "management")]
management_client: ManagementServiceClient::with_interceptor(
self.channel.clone(),
self.interceptor.clone(),
),
},
shutdown_signal: Box::pin(pending()),
consumer: self.inner.consumer.lock().await.take().unwrap(),
Expand All @@ -208,6 +263,8 @@ impl<P, C> Clone for GrpcReporter<P, C> {
Self {
inner: self.inner.clone(),
err_handle: self.err_handle.clone(),
channel: self.channel.clone(),
interceptor: self.interceptor.clone(),
}
}
}
Expand All @@ -227,9 +284,17 @@ impl<P: CollectItemProduce, C: CollectItemConsume> Report for GrpcReporter<P, C>
struct ReporterAndBuffer<P, C> {
inner: Arc<Inner<P, C>>,
status_handle: Option<Box<dyn Fn(tonic::Status) + Send + 'static>>,

trace_buffer: LinkedList<SegmentObject>,
log_buffer: LinkedList<LogData>,
meter_buffer: LinkedList<MeterData>,

trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
#[cfg(feature = "management")]
#[cfg_attr(docsrs, doc(cfg(feature = "management")))]
management_client: ManagementServiceClient<InterceptedService<Channel, CustomInterceptor>>,
}

impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
Expand All @@ -248,10 +313,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
#[cfg(feature = "management")]
CollectItem::Instance(item) => {
if let Err(e) = self
.inner
.management_client
.lock()
.await
.report_instance_properties(*item)
.await
{
Expand All @@ -262,14 +324,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
}
#[cfg(feature = "management")]
CollectItem::Ping(item) => {
if let Err(e) = self
.inner
.management_client
.lock()
.await
.keep_alive(*item)
.await
{
if let Err(e) = self.management_client.keep_alive(*item).await {
if let Some(status_handle) = &self.status_handle {
status_handle(e);
}
Expand All @@ -279,29 +334,15 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {

if !self.trace_buffer.is_empty() {
let buffer = take(&mut self.trace_buffer);
if let Err(e) = self
.inner
.trace_client
.lock()
.await
.collect(stream::iter(buffer))
.await
{
if let Err(e) = self.trace_client.collect(stream::iter(buffer)).await {
if let Some(status_handle) = &self.status_handle {
status_handle(e);
}
}
}
if !self.log_buffer.is_empty() {
let buffer = take(&mut self.log_buffer);
if let Err(e) = self
.inner
.log_client
.lock()
.await
.collect(stream::iter(buffer))
.await
{
if let Err(e) = self.log_client.collect(stream::iter(buffer)).await {
if let Some(status_handle) = &self.status_handle {
status_handle(e);
}
Expand All @@ -310,14 +351,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {

if !self.meter_buffer.is_empty() {
let buffer = take(&mut self.meter_buffer);
if let Err(e) = self
.inner
.meter_client
.lock()
.await
.collect(stream::iter(buffer))
.await
{
if let Err(e) = self.meter_client.collect(stream::iter(buffer)).await {
if let Some(status_handle) = &self.status_handle {
status_handle(e);
}
Expand Down
1 change: 1 addition & 0 deletions src/reporter/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl PrintReporter {
}

impl Report for PrintReporter {
#[allow(clippy::print_stdout)]
fn report(&self, items: CollectItem) {
match items {
CollectItem::Trace(data) => {
Expand Down
1 change: 1 addition & 0 deletions src/skywalking_proto/v3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Generated code of `skywalking.v3`, by `tonic`.
#![allow(missing_docs)]
#![allow(rustdoc::invalid_html_tags)]

use crate::common::system_time::{fetch_time, TimePeriod};

Expand Down
8 changes: 4 additions & 4 deletions src/trace/trace_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ impl SpanStack {
}

pub(crate) fn with_finalized_mut<T>(&self, f: impl FnOnce(&mut Vec<SpanObject>) -> T) -> T {
f(&mut *self.finalized.try_write().expect(LOCK_MSG))
f(&mut self.finalized.try_write().expect(LOCK_MSG))
}

pub(crate) fn with_active<T>(&self, f: impl FnOnce(&Vec<SpanObject>) -> T) -> T {
f(&*self.active.try_read().expect(LOCK_MSG))
f(&self.active.try_read().expect(LOCK_MSG))
}

pub(crate) fn with_active_mut<T>(&self, f: impl FnOnce(&mut Vec<SpanObject>) -> T) -> T {
f(&mut *self.active.try_write().expect(LOCK_MSG))
f(&mut self.active.try_write().expect(LOCK_MSG))
}

fn pop_active(&self, index: usize) -> Option<SpanObject> {
Expand Down Expand Up @@ -184,7 +184,7 @@ impl TracingContext {
}

fn with_spans_mut<T>(&mut self, f: impl FnOnce(&mut Vec<SpanObject>) -> T) -> T {
f(&mut *self.span_stack.finalized.try_write().expect(LOCK_MSG))
f(&mut self.span_stack.finalized.try_write().expect(LOCK_MSG))
}

pub(crate) fn with_active_span_stack<T>(&self, f: impl FnOnce(&Vec<SpanObject>) -> T) -> T {
Expand Down

0 comments on commit cbf1079

Please sign in to comment.