From 3e40187e15d6e2a7140bba64c0029e766ef4cda9 Mon Sep 17 00:00:00 2001 From: jmjoy Date: Mon, 8 Aug 2022 20:25:12 +0800 Subject: [PATCH 1/6] Move tracer reporting method to grpc reporter. --- README.md | 18 +- e2e/src/main.rs | 5 +- examples/simple_trace_report.rs | 18 +- src/common/mod.rs | 1 + src/{trace => common}/system_time.rs | 0 src/logging/logger.rs | 57 ++++- src/logging/record.rs | 151 +++++++++++++- src/reporter/grpc.rs | 302 +++++++++++++++++++++++++-- src/reporter/log.rs | 74 ------- src/reporter/mod.rs | 33 +-- src/reporter/once_cell.rs | 38 ++++ src/reporter/print.rs | 55 +++++ src/skywalking_proto/v3/mod.rs | 2 +- src/trace/mod.rs | 1 - src/trace/span.rs | 6 +- src/trace/trace_context.rs | 18 +- src/trace/tracer.rs | 239 +-------------------- tests/propagation.rs | 4 +- tests/trace_context.rs | 30 ++- 19 files changed, 662 insertions(+), 390 deletions(-) rename src/{trace => common}/system_time.rs (100%) delete mode 100644 src/reporter/log.rs create mode 100644 src/reporter/once_cell.rs create mode 100644 src/reporter/print.rs diff --git a/README.md b/README.md index 573d147..64d9c72 100644 --- a/README.md +++ b/README.md @@ -75,16 +75,18 @@ async fn handle_request(tracer: Tracer) { #[tokio::main] async fn main() -> Result<(), Box> { let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?; - let tracer = Tracer::new("service", "instance", reporter); + let handle = reporter + .reporting() + .await + .with_graceful_shutdown(async move { + signal::ctrl_c().await.expect("failed to listen for event"); + }) + .spawn(); - tokio::spawn(handle_request(tracer.clone())); + let tracer = Tracer::new("service", "instance", reporter); + handle_request(tracer).await; - // Start to report. - tracer - .reporting(async move { - let _ = signal::ctrl_c().await.unwrap(); - }) - .await?; + handle.await?; Ok(()) } diff --git a/e2e/src/main.rs b/e2e/src/main.rs index 2b1780c..b6e61c6 100644 --- a/e2e/src/main.rs +++ b/e2e/src/main.rs @@ -30,7 +30,7 @@ use skywalking::{ tracer::{self, Tracer}, }, }; -use std::{convert::Infallible, error::Error, future, net::SocketAddr}; +use std::{convert::Infallible, error::Error, net::SocketAddr}; use structopt::StructOpt; static NOT_FOUND_MSG: &str = "not found"; @@ -154,15 +154,14 @@ struct Opt { async fn main() -> Result<(), Box> { let opt = Opt::from_args(); let reporter = GrpcReporter::connect("http://collector:19876").await?; + let handle = reporter.reporting().await.spawn(); let handle = if opt.mode == "consumer" { tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter)); - let handle = tracer::reporting(future::pending()); run_consumer_service([0, 0, 0, 0]).await; handle } else if opt.mode == "producer" { tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter)); - let handle = tracer::reporting(future::pending()); run_producer_service([0, 0, 0, 0]).await; handle } else { diff --git a/examples/simple_trace_report.rs b/examples/simple_trace_report.rs index af382f2..26f0400 100644 --- a/examples/simple_trace_report.rs +++ b/examples/simple_trace_report.rs @@ -49,16 +49,18 @@ async fn handle_request(tracer: Tracer) { #[tokio::main] async fn main() -> Result<(), Box> { let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?; - let tracer = Tracer::new("service", "instance", reporter); + let handle = reporter + .reporting() + .await + .with_graceful_shutdown(async move { + signal::ctrl_c().await.expect("failed to listen for event"); + }) + .spawn(); - tokio::spawn(handle_request(tracer.clone())); + let tracer = Tracer::new("service", "instance", reporter); + handle_request(tracer).await; - // Start to report. - tracer - .reporting(async move { - let _ = signal::ctrl_c().await.unwrap(); - }) - .await?; + handle.await?; Ok(()) } diff --git a/src/common/mod.rs b/src/common/mod.rs index 7bb6d17..7c59317 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -15,3 +15,4 @@ // pub mod random_generator; +pub(crate) mod system_time; diff --git a/src/trace/system_time.rs b/src/common/system_time.rs similarity index 100% rename from src/trace/system_time.rs rename to src/common/system_time.rs diff --git a/src/logging/logger.rs b/src/logging/logger.rs index 8cda4c1..e0cad06 100644 --- a/src/logging/logger.rs +++ b/src/logging/logger.rs @@ -14,4 +14,59 @@ // limitations under the License. // -pub struct Logger {} +use super::record::LogRecord; +use crate::reporter::{DynReport, Report}; +use std::sync::Arc; +use tokio::sync::OnceCell; + +static GLOBAL_LOGGER: OnceCell = OnceCell::const_new(); + +/// Set the global logger. +pub fn set_global_logger(tracer: Logger) { + if GLOBAL_LOGGER.set(tracer).is_err() { + panic!("global logger has setted") + } +} + +/// Get the global logger. +pub fn global_logger() -> &'static Logger { + GLOBAL_LOGGER.get().expect("global logger haven't setted") +} + +pub struct Inner { + service_name: String, + instance_name: String, + reporter: Box, +} + +#[derive(Clone)] +pub struct Logger { + inner: Arc, +} + +impl Logger { + /// New with service info and reporter. + pub fn new( + service_name: impl ToString, + instance_name: impl ToString, + reporter: impl Report + Send + Sync + 'static, + ) -> Self { + Self { + inner: Arc::new(Inner { + service_name: service_name.to_string(), + instance_name: instance_name.to_string(), + reporter: Box::new(reporter), + }), + } + } + + pub fn service_name(&self) -> &str { + &self.inner.service_name + } + + pub fn instance_name(&self) -> &str { + &self.inner.instance_name + } + + pub fn log(&self, record: LogRecord) {} +} diff --git a/src/logging/record.rs b/src/logging/record.rs index ecad7e6..248f568 100644 --- a/src/logging/record.rs +++ b/src/logging/record.rs @@ -14,4 +14,153 @@ // limitations under the License. // -pub struct LogRecord {} +use crate::{ + common::system_time::{fetch_time, TimePeriod}, + skywalking_proto::v3::{ + log_data_body::Content, JsonLog, KeyStringValuePair, LogData, LogDataBody, LogTags, + TextLog, TraceContext, YamlLog, + }, + trace::{span::Span, trace_context::TracingContext}, +}; +use std::{ + collections::HashMap, + time::{SystemTime, UNIX_EPOCH}, +}; + +use super::logger::Logger; + +pub enum RecordType { + Text, + Json, + Yaml, +} + +impl Default for RecordType { + fn default() -> Self { + Self::Text + } +} + +#[derive(Default)] +pub struct LogRecord { + time: Option, + is_ignore_time: bool, + endpoint: String, + tags: HashMap, + trace_id: Option, + trace_segment_id: Option, + span_id: Option, + record_type: RecordType, + content: String, +} + +impl LogRecord { + #[inline] + pub fn new() -> Self { + Default::default() + } + + pub fn custome_time(mut self, time: SystemTime) -> Self { + self.time = Some(time); + self + } + + pub fn ignore_time(mut self) -> Self { + self.is_ignore_time = true; + self + } + + pub fn endpoint(mut self, endpoint: impl ToString) -> Self { + self.endpoint = endpoint.to_string(); + self + } + + pub fn add_tag(mut self, key: impl ToString, value: impl ToString) -> Self { + self.tags.insert(key.to_string(), value.to_string()); + self + } + + pub fn add_tags(mut self, tags: I) -> Self + where + K: ToString, + V: ToString, + I: IntoIterator, + { + self.tags.extend( + tags.into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())), + ); + self + } + + pub fn with_tracing_context(mut self, tracing_context: &TracingContext) -> Self { + self.trace_id = Some(tracing_context.trace_id().to_owned()); + self.trace_segment_id = Some(tracing_context.trace_segment_id().to_owned()); + self + } + + pub fn with_span(mut self, span: &Span) -> Self { + self.span_id = Some(span.with_span_object(|span| span.span_id)); + self + } + + pub fn record_type(mut self, record_type: RecordType) -> Self { + self.record_type = record_type; + self + } + + pub fn content(mut self, content: impl ToString) -> Self { + self.content = content.to_string(); + self + } + + pub(crate) fn convert_to_log_data(self, logger: &Logger) -> LogData { + let timestamp = if self.is_ignore_time { + 0 + } else { + match self.time { + Some(time) => time + .duration_since(UNIX_EPOCH) + .map(|dur| dur.as_millis() as i64) + .unwrap_or_default(), + None => fetch_time(TimePeriod::Log), + } + }; + let trace_context = match (self.trace_id, self.trace_segment_id, self.span_id) { + (Some(trace_id), Some(trace_segment_id), Some(span_id)) => Some(TraceContext { + trace_id, + trace_segment_id, + span_id, + }), + _ => None, + }; + let tags = if self.tags.is_empty() { + None + } else { + let data = self + .tags + .into_iter() + .map(|(key, value)| KeyStringValuePair { key, value }) + .collect(); + Some(LogTags { data }) + }; + + LogData { + timestamp, + service: logger.service_name().to_owned(), + service_instance: logger.instance_name().to_owned(), + endpoint: self.endpoint, + body: Some(LogDataBody { + r#type: "".to_owned(), + content: match self.record_type { + RecordType::Text => Some(Content::Text(TextLog { text: self.content })), + RecordType::Json => Some(Content::Json(JsonLog { json: self.content })), + RecordType::Yaml => Some(Content::Yaml(YamlLog { yaml: self.content })), + }, + }), + trace_context, + tags, + layer: Default::default(), + } + } +} diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index fb04364..008059a 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -14,42 +14,310 @@ // limitations under the License. // -use super::Reporter; -use crate::skywalking_proto::v3::{ - trace_segment_report_service_client::TraceSegmentReportServiceClient, SegmentObject, +use crate::{ + reporter::{CollectItem, Report}, + skywalking_proto::v3::{ + log_report_service_client::LogReportServiceClient, + trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData, + SegmentObject, + }, }; use futures_util::stream; -use std::{collections::LinkedList, error::Error}; +use std::{ + collections::LinkedList, + error::Error, + future::{pending, Future}, + mem::take, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + task::{Context, Poll}, +}; +use tokio::{ + select, + sync::{mpsc, Mutex}, + task::JoinHandle, + try_join, +}; use tonic::{ async_trait, transport::{self, Channel, Endpoint}, }; -type ReporterClient = TraceSegmentReportServiceClient; +pub trait CollectItemProduce: Send + Sync + 'static { + fn produce(&self, item: CollectItem) -> Result<(), Box>; +} -pub struct GrpcReporter { - client: ReporterClient, +impl CollectItemProduce for () { + fn produce(&self, _item: CollectItem) -> Result<(), Box> { + Ok(()) + } } -impl GrpcReporter { +impl CollectItemProduce for mpsc::UnboundedSender { + fn produce(&self, item: CollectItem) -> Result<(), Box> { + Ok(self.send(item)?) + } +} + +#[async_trait] +pub trait ColletcItemConsume: Send + Sync + 'static { + async fn consume(&mut self) -> Result, Box>; + + async fn try_consume(&mut self) -> Result, Box>; +} + +#[async_trait] +impl ColletcItemConsume for () { + async fn consume(&mut self) -> Result, Box> { + Ok(None) + } + + async fn try_consume(&mut self) -> Result, Box> { + Ok(None) + } +} + +#[async_trait] +impl ColletcItemConsume for mpsc::UnboundedReceiver { + async fn consume(&mut self) -> Result, Box> { + Ok(self.recv().await) + } + + async fn try_consume(&mut self) -> Result, Box> { + use mpsc::error::TryRecvError; + + match self.try_recv() { + Ok(item) => Ok(Some(item)), + Err(e) => match e { + TryRecvError::Empty => Ok(None), + TryRecvError::Disconnected => Err(Box::new(e)), + }, + } + } +} + +struct Inner { + trace_client: Mutex>, + log_client: Mutex>, + producer: P, + consumer: Mutex>, + is_reporting: AtomicBool, + is_closed: AtomicBool, +} + +#[derive(Clone)] +pub struct GrpcReporter { + inner: Arc>, +} + +impl GrpcReporter, mpsc::UnboundedReceiver> { pub fn new(channel: Channel) -> Self { - let client = ReporterClient::new(channel); - Self { client } + let (p, c) = mpsc::unbounded_channel(); + Self::new_with_pc(channel, p, c) } pub async fn connect( address: impl TryInto, ) -> crate::Result { - let client = ReporterClient::connect(address.try_into()?).await?; - Ok(Self { client }) + let endpoint = address.try_into()?; + let channel = endpoint.connect().await?; + Ok(Self::new(channel)) } } -#[async_trait] -impl Reporter for GrpcReporter { - async fn collect(&mut self, segments: LinkedList) -> Result<(), Box> { - let stream = stream::iter(segments); - self.client.collect(stream).await?; +impl GrpcReporter { + pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self { + Self { + inner: Arc::new(Inner { + trace_client: Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())), + log_client: Mutex::new(LogReportServiceClient::new(channel)), + producer, + consumer: Mutex::new(Some(consumer)), + is_reporting: Default::default(), + is_closed: Default::default(), + }), + } + } + + /// Start to reporting, quit when shutdown_signal received. + /// + /// Accept a `shutdown_signal` argument as a graceful shutdown signal. + /// + /// # Panics + /// + /// Panic if call more than once. + pub async fn reporting(&self) -> Reporting { + if self.inner.is_reporting.swap(true, Ordering::Relaxed) { + panic!("reporting already called"); + } + + Reporting { + rb: ReporterAndBuffer { + reporter: GrpcReporter { + inner: Arc::clone(&self.inner), + }, + trace_buffer: Default::default(), + log_buffer: Default::default(), + status_handle: None, + }, + shutdown_signal: Box::pin(pending()), + consumer: self.inner.consumer.lock().await.take().unwrap(), + } + } +} + +impl Report for GrpcReporter { + fn report(&self, item: CollectItem) { + if !self.inner.is_closed.load(Ordering::Relaxed) { + let _ = self.inner.producer.produce(item); + } + } +} + +struct ReporterAndBuffer { + reporter: GrpcReporter, + trace_buffer: LinkedList, + log_buffer: LinkedList, + status_handle: Option>, +} + +impl ReporterAndBuffer { + async fn report(&mut self, item: CollectItem) { + // TODO Implement batch collect in future. + match item { + CollectItem::Trace(item) => { + self.trace_buffer.push_back(item); + } + CollectItem::Log(item) => { + self.log_buffer.push_back(item); + } + } + + if !self.trace_buffer.is_empty() { + let buffer = take(&mut self.trace_buffer); + if let Err(e) = self + .reporter + .inner + .trace_client + .lock() + .await + .collect(stream::iter(buffer)) + .await + { + if let Some(status_handle) = &self.status_handle { + status_handle(e); + } + } + } + if !self.log_buffer.is_empty() { + let buffer = take(&mut self.log_buffer); + if let Err(e) = self + .reporter + .inner + .log_client + .lock() + .await + .collect(stream::iter(buffer)) + .await + { + if let Some(status_handle) = &self.status_handle { + status_handle(e); + } + } + } + } +} + +/// Created by [Tracer::reporting]. +pub struct Reporting { + rb: ReporterAndBuffer, + consumer: C, + shutdown_signal: Pin + Send + Sync + 'static>>, +} + +impl Reporting { + pub fn with_graceful_shutdown( + mut self, + shutdown_signal: impl Future + Send + Sync + 'static, + ) -> Self { + self.shutdown_signal = Box::pin(shutdown_signal); + self + } + + pub fn with_staus_handle(mut self, handle: impl Fn(tonic::Status) + Send + 'static) -> Self { + self.rb.status_handle = Some(Box::new(handle)); + self + } + + pub fn spawn(self) -> ReportingJoinHandle { + ReportingJoinHandle { + handle: tokio::spawn(self.start()), + } + } + + async fn start(self) -> crate::Result<()> { + let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel(); + let Reporting { + mut rb, + mut consumer, + shutdown_signal, + } = self; + + let work_fut = async move { + loop { + select! { + item = consumer.consume() => { + match item { + Ok(Some(item)) => { + rb.report(item).await; + } + Ok(None) => break, + Err(err) => Err(crate::Error::Other(err))?, + } + } + _ = shutdown_rx.recv() => break, + } + } + + rb.reporter.inner.is_closed.store(true, Ordering::Relaxed); + + // Flush. + loop { + match consumer.try_consume().await { + Ok(Some(item)) => { + rb.report(item).await; + } + Ok(None) => break, + Err(err) => return Err(err.into()), + } + } + + Ok::<_, crate::Error>(()) + }; + + let shutdown_fut = async move { + shutdown_signal.await; + let _ = shutdown_tx.send(()); + Ok(()) + }; + + try_join!(work_fut, shutdown_fut)?; + Ok(()) } } + +pub struct ReportingJoinHandle { + handle: JoinHandle>, +} + +impl Future for ReportingJoinHandle { + type Output = crate::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.handle).poll(cx).map(|r| r?) + } +} diff --git a/src/reporter/log.rs b/src/reporter/log.rs deleted file mode 100644 index 2e0a98a..0000000 --- a/src/reporter/log.rs +++ /dev/null @@ -1,74 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -use super::Reporter; -use crate::skywalking_proto::v3::SegmentObject; -use std::{collections::LinkedList, error::Error}; -use tonic::async_trait; - -enum Used { - Println, - Tracing, -} - -pub struct LogReporter { - tip: String, - used: Used, -} - -impl LogReporter { - #[inline] - pub fn new() -> Self { - Default::default() - } - - pub fn tip(mut self, tip: String) -> Self { - self.tip = tip; - self - } - - pub fn use_tracing(mut self) -> Self { - self.used = Used::Tracing; - self - } - - pub fn use_println(mut self) -> Self { - self.used = Used::Println; - self - } -} - -impl Default for LogReporter { - fn default() -> Self { - Self { - tip: "collect".to_string(), - used: Used::Println, - } - } -} - -#[async_trait] -impl Reporter for LogReporter { - async fn collect(&mut self, segments: LinkedList) -> Result<(), Box> { - for segment in segments { - match self.used { - Used::Println => println!("{} segment={:?}", self.tip, segment), - Used::Tracing => tracing::info!(?segment, "{}", self.tip), - } - } - Ok(()) - } -} diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs index bb52a16..653b16a 100644 --- a/src/reporter/mod.rs +++ b/src/reporter/mod.rs @@ -15,25 +15,26 @@ // pub mod grpc; -pub mod log; +pub mod once_cell; +pub mod print; -use crate::skywalking_proto::v3::SegmentObject; -use std::{collections::LinkedList, error::Error, result::Result}; -use tonic::async_trait; +use crate::skywalking_proto::v3::{LogData, SegmentObject}; -pub(crate) type DynReporter = dyn Reporter + Send + Sync + 'static; +#[derive(Debug)] +#[non_exhaustive] +pub enum CollectItem { + Trace(SegmentObject), + Log(LogData), +} + +pub(crate) type DynReport = dyn Report + Send + Sync + 'static; -#[async_trait] -pub trait Reporter { - async fn collect(&mut self, segments: LinkedList) -> Result<(), Box>; +/// Report provide non-blocking report method for trace, log and metric object. +pub trait Report { + fn report(&self, item: CollectItem); } -#[async_trait] -impl Reporter for () { - async fn collect( - &mut self, - _segments: LinkedList, - ) -> Result<(), Box> { - Ok(()) - } +/// Noop reporter. +impl Report for () { + fn report(&self, _item: CollectItem) {} } diff --git a/src/reporter/once_cell.rs b/src/reporter/once_cell.rs new file mode 100644 index 0000000..acd8d73 --- /dev/null +++ b/src/reporter/once_cell.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use crate::reporter::{CollectItem, Report}; +use tokio::sync::OnceCell; + +#[derive(Clone, Default)] +pub struct OnceCellReporter { + report: OnceCell, +} + +impl OnceCellReporter { + pub fn set(&self, report: R) -> Option<()> { + self.report.set(report).ok() + } +} + +impl Report for OnceCellReporter { + fn report(&self, item: CollectItem) { + self.report + .get() + .expect("Report hasn't setted") + .report(item) + } +} diff --git a/src/reporter/print.rs b/src/reporter/print.rs new file mode 100644 index 0000000..6b355e5 --- /dev/null +++ b/src/reporter/print.rs @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use crate::reporter::{CollectItem, Report}; + +#[derive(Default, Clone)] +pub struct PrintReporter { + use_stderr: bool, +} + +impl PrintReporter { + #[inline] + pub fn new() -> Self { + Default::default() + } + + pub fn use_stderr(mut self, use_stderr: bool) -> Self { + self.use_stderr = use_stderr; + self + } +} + +impl Report for PrintReporter { + fn report(&self, items: CollectItem) { + match items { + CollectItem::Trace(segment) => { + if self.use_stderr { + eprintln!("trace segment={:?}", segment); + } else { + println!("trace segment={:?}", segment); + } + } + CollectItem::Log(data) => { + if self.use_stderr { + eprintln!("log data={:?}", data); + } else { + println!("log data={:?}", data); + } + } + } + } +} diff --git a/src/skywalking_proto/v3/mod.rs b/src/skywalking_proto/v3/mod.rs index 7a2a268..8623eaa 100644 --- a/src/skywalking_proto/v3/mod.rs +++ b/src/skywalking_proto/v3/mod.rs @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. // -use crate::trace::system_time::{fetch_time, TimePeriod}; +use crate::common::system_time::{fetch_time, TimePeriod}; tonic::include_proto!("skywalking.v3"); diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 28d7ace..c194875 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -16,6 +16,5 @@ pub mod propagation; pub mod span; -pub(crate) mod system_time; pub mod trace_context; pub mod tracer; diff --git a/src/trace/span.rs b/src/trace/span.rs index c9c6294..ce49c6b 100644 --- a/src/trace/span.rs +++ b/src/trace/span.rs @@ -14,13 +14,11 @@ // limitations under the License. // -use super::{ - system_time::{fetch_time, TimePeriod}, - trace_context::SpanStack, -}; use crate::{ + common::system_time::{fetch_time, TimePeriod}, error::LOCK_MSG, skywalking_proto::v3::{SpanLayer, SpanObject, SpanType}, + trace::trace_context::SpanStack, }; use std::{ fmt::Formatter, diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs index 6f23adb..3710199 100644 --- a/src/trace/trace_context.rs +++ b/src/trace/trace_context.rs @@ -14,18 +14,20 @@ // limitations under the License. // -use super::{ - span::Span, - system_time::{fetch_time, TimePeriod}, - tracer::{Tracer, WeakTracer}, -}; use crate::{ - common::random_generator::RandomGenerator, + common::{ + random_generator::RandomGenerator, + system_time::{fetch_time, TimePeriod}, + }, error::LOCK_MSG, skywalking_proto::v3::{ RefType, SegmentObject, SegmentReference, SpanLayer, SpanObject, SpanType, }, - trace::propagation::context::PropagationContext, + trace::{ + propagation::context::PropagationContext, + span::Span, + tracer::{Tracer, WeakTracer}, + }, }; use std::{ fmt::Formatter, @@ -332,7 +334,7 @@ impl TracingContext { /// This conversion should be done before sending segments into OAP. /// /// Notice: The spans will taked, so this method shouldn't be called twice. - pub(crate) fn convert_segment_object(&mut self) -> SegmentObject { + pub(crate) fn convert_to_segment_object(&mut self) -> SegmentObject { let trace_id = self.trace_id().to_owned(); let trace_segment_id = self.trace_segment_id().to_owned(); let service = self.service().to_owned(); diff --git a/src/trace/tracer.rs b/src/trace/tracer.rs index 9ea9696..963d270 100644 --- a/src/trace/tracer.rs +++ b/src/trace/tracer.rs @@ -15,29 +15,11 @@ // use crate::{ - reporter::{DynReporter, Reporter}, - skywalking_proto::v3::SegmentObject, + reporter::{CollectItem, DynReport, Report}, trace::trace_context::TracingContext, }; -use std::{ - collections::LinkedList, - error::Error, - future::Future, - pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Weak, - }, - task::{Context, Poll}, -}; -use tokio::{ - sync::{ - mpsc::{self}, - Mutex, OnceCell, - }, - task::JoinHandle, -}; -use tonic::async_trait; +use std::sync::{Arc, Weak}; +use tokio::sync::OnceCell; static GLOBAL_TRACER: OnceCell = OnceCell::const_new(); @@ -58,74 +40,10 @@ pub fn create_trace_context() -> TracingContext { global_tracer().create_trace_context() } -/// Start to reporting by global tracer, quit when shutdown_signal received. -/// -/// Accept a `shutdown_signal` argument as a graceful shutdown signal. -pub fn reporting(shutdown_signal: impl Future + Send + Sync + 'static) -> Reporting { - global_tracer().reporting(shutdown_signal) -} - -pub trait SegmentSender: Send + Sync + 'static { - fn send(&self, segment: SegmentObject) -> Result<(), Box>; -} - -impl SegmentSender for () { - fn send(&self, _segment: SegmentObject) -> Result<(), Box> { - Ok(()) - } -} - -impl SegmentSender for mpsc::UnboundedSender { - fn send(&self, segment: SegmentObject) -> Result<(), Box> { - Ok(self.send(segment)?) - } -} - -#[async_trait] -pub trait SegmentReceiver: Send + Sync + 'static { - async fn recv(&self) -> Result, Box>; - - async fn try_recv(&self) -> Result, Box>; -} - -#[async_trait] -impl SegmentReceiver for () { - async fn recv(&self) -> Result, Box> { - Ok(None) - } - - async fn try_recv(&self) -> Result, Box> { - Ok(None) - } -} - -#[async_trait] -impl SegmentReceiver for Mutex> { - async fn recv(&self) -> Result, Box> { - Ok(self.lock().await.recv().await) - } - - async fn try_recv(&self) -> Result, Box> { - use mpsc::error::TryRecvError; - - match self.lock().await.try_recv() { - Ok(segment) => Ok(Some(segment)), - Err(e) => match e { - TryRecvError::Empty => Ok(None), - TryRecvError::Disconnected => Err(Box::new(e)), - }, - } - } -} - struct Inner { service_name: String, instance_name: String, - segment_sender: Box, - segment_receiver: Box, - reporter: Mutex>, - is_reporting: AtomicBool, - is_closed: AtomicBool, + reporter: Box, } /// Skywalking tracer. @@ -139,33 +57,13 @@ impl Tracer { pub fn new( service_name: impl ToString, instance_name: impl ToString, - reporter: impl Reporter + Send + Sync + 'static, - ) -> Self { - let (segment_sender, segment_receiver) = mpsc::unbounded_channel(); - Self::new_with_channel( - service_name, - instance_name, - reporter, - (segment_sender, Mutex::new(segment_receiver)), - ) - } - - /// New with service info, reporter, and custom channel. - pub fn new_with_channel( - service_name: impl ToString, - instance_name: impl ToString, - reporter: impl Reporter + Send + Sync + 'static, - channel: (impl SegmentSender, impl SegmentReceiver), + reporter: impl Report + Send + Sync + 'static, ) -> Self { Self { inner: Arc::new(Inner { service_name: service_name.to_string(), instance_name: instance_name.to_string(), - segment_sender: Box::new(channel.0), - segment_receiver: Box::new(channel.1), - reporter: Mutex::new(Box::new(reporter)), - is_reporting: Default::default(), - is_closed: Default::default(), + reporter: Box::new(reporter), }), } } @@ -178,15 +76,6 @@ impl Tracer { &self.inner.instance_name } - /// Set the reporter, only valid if [`Tracer::reporting`] not started. - pub fn set_reporter(&self, reporter: impl Reporter + Send + Sync + 'static) { - if !self.inner.is_reporting.load(Ordering::Relaxed) { - if let Ok(mut lock) = self.inner.reporter.try_lock() { - *lock = Box::new(reporter); - } - } - } - /// Create trace conetxt. pub fn create_trace_context(&self) -> TracingContext { TracingContext::new( @@ -198,98 +87,10 @@ impl Tracer { /// Finalize the trace context. pub(crate) fn finalize_context(&self, context: &mut TracingContext) { - if self.inner.is_closed.load(Ordering::Relaxed) { - tracing::warn!("tracer closed"); - return; - } - - let segment_object = context.convert_segment_object(); - if let Err(err) = self.inner.segment_sender.send(segment_object) { - tracing::error!(?err, "send segment object failed"); - } - } - - /// Start to reporting, quit when shutdown_signal received. - /// - /// Accept a `shutdown_signal` argument as a graceful shutdown signal. - /// - /// # Panics - /// - /// Panic if call more than once. - pub fn reporting( - &self, - shutdown_signal: impl Future + Send + Sync + 'static, - ) -> Reporting { - if self.inner.is_reporting.swap(true, Ordering::Relaxed) { - panic!("reporting already called"); - } - - Reporting { - handle: tokio::spawn(self.clone().do_reporting(shutdown_signal)), - } - } - - async fn do_reporting( - self, - shutdown_signal: impl Future + Send + Sync + 'static, - ) -> crate::Result<()> { - let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel(); - - let handle = tokio::spawn(async move { - loop { - tokio::select! { - segment = self.inner.segment_receiver.recv() => { - match segment { - Ok(Some(segment)) => { - // TODO Implement batch collect in future. - let mut segments = LinkedList::new(); - segments.push_back(segment); - Self::report_segment_object(&self.inner.reporter, segments).await; - } - Ok(None) => break, - Err(err) => return Err(err.into()), - } - } - _ = shutdown_rx.recv() => break, - } - } - - self.inner.is_closed.store(true, Ordering::Relaxed); - - // Flush. - let mut segments = LinkedList::new(); - loop { - match self.inner.segment_receiver.try_recv().await { - Ok(Some(segment)) => { - segments.push_back(segment); - } - Ok(None) => break, - Err(err) => return Err(err.into()), - } - } - Self::report_segment_object(&self.inner.reporter, segments).await; - - Ok::<_, crate::Error>(()) - }); - - shutdown_signal.await; - - if shutdown_tx.send(()).is_err() { - tracing::error!("shutdown signal send failed"); - } - - handle.await??; - - Ok(()) - } - - async fn report_segment_object( - reporter: &Mutex>, - segments: LinkedList, - ) { - if let Err(err) = reporter.lock().await.collect(segments).await { - tracing::error!(?err, "collect failed"); - } + let segment_object = context.convert_to_segment_object(); + self.inner + .reporter + .report(CollectItem::Trace(segment_object)); } fn downgrade(&self) -> WeakTracer { @@ -310,31 +111,11 @@ impl WeakTracer { } } -/// Created by [Tracer::reporting]. -pub struct Reporting { - handle: JoinHandle>, -} - -impl Future for Reporting { - type Output = crate::Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.handle).poll(cx).map(|r| r?) - } -} - #[cfg(test)] mod tests { use super::*; - use std::future; trait AssertSend: Send {} impl AssertSend for Tracer {} - - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn custom_channel() { - let tracer = Tracer::new_with_channel("service_name", "instance_name", (), ((), ())); - tracer.reporting(future::ready(())).await.unwrap(); - } } diff --git a/tests/propagation.rs b/tests/propagation.rs index 1b95812..3353e95 100644 --- a/tests/propagation.rs +++ b/tests/propagation.rs @@ -16,7 +16,7 @@ #![allow(unused_imports)] use skywalking::{ - reporter::log::LogReporter, + reporter::print::PrintReporter, trace::{ propagation::{ context::PropagationContext, decoder::decode_propagation, encoder::encode_propagation, @@ -68,7 +68,7 @@ fn invalid_sample() { #[test] fn basic_encode() { - let tracer = Tracer::new("mesh", "instance", LogReporter::new()); + let tracer = Tracer::new("mesh", "instance", PrintReporter::new()); let tc = tracer.create_trace_context(); let res = encode_propagation(&tc, "/api/v1/health", "example.com:8080"); let res2 = decode_propagation(&res).unwrap(); diff --git a/tests/trace_context.rs b/tests/trace_context.rs index 7ca80d2..efae1e8 100644 --- a/tests/trace_context.rs +++ b/tests/trace_context.rs @@ -16,7 +16,7 @@ use prost::Message; use skywalking::{ - reporter::{log::LogReporter, Reporter}, + reporter::{print::PrintReporter, CollectItem, Report}, skywalking_proto::v3::{ KeyStringValuePair, Log, RefType, SegmentObject, SegmentReference, SpanLayer, SpanObject, SpanType, @@ -28,8 +28,6 @@ use skywalking::{ }; use std::{ collections::LinkedList, - error::Error, - future, sync::{Arc, Mutex}, thread, }; @@ -182,7 +180,7 @@ async fn create_span() { #[test] #[should_panic] fn create_local_span_failed() { - let tracer = Tracer::new("service", "instance", LogReporter::new()); + let tracer = Tracer::new("service", "instance", PrintReporter::new()); let mut context = tracer.create_trace_context(); let _span1 = context.create_local_span("op1"); } @@ -190,7 +188,7 @@ fn create_local_span_failed() { #[test] #[should_panic] fn create_exit_span_failed() { - let tracer = Tracer::new("service", "instance", LogReporter::new()); + let tracer = Tracer::new("service", "instance", PrintReporter::new()); let mut context = tracer.create_trace_context(); let _span1 = context.create_exit_span("op1", "example.com/test"); } @@ -220,7 +218,7 @@ async fn create_span_from_context() { #[test] fn crossprocess_test() { - let tracer = Tracer::new("service", "instance", LogReporter::new()); + let tracer = Tracer::new("service", "instance", PrintReporter::new()); let mut context1 = tracer.create_trace_context(); assert_eq!(context1.service(), "service"); assert_eq!(context1.service_instance(), "instance"); @@ -233,7 +231,7 @@ fn crossprocess_test() { let enc_prop = encode_propagation(&context1, "endpoint", "address"); let dec_prop = decode_propagation(&enc_prop).unwrap(); - let tracer = Tracer::new("service2", "instance2", LogReporter::new()); + let tracer = Tracer::new("service2", "instance2", PrintReporter::new()); let mut context2 = tracer.create_trace_context(); let span3 = context2.create_entry_span_with_propagation("op2", &dec_prop); @@ -323,20 +321,18 @@ impl MockReporter { f2: impl FnOnce(&LinkedList), ) { let reporter = MockReporter::default(); - - let tracer = f1(reporter.clone()); - - tracer.reporting(future::ready(())).await.unwrap(); - + f1(reporter.clone()); let segments = reporter.segments.try_lock().unwrap(); f2(&*segments); } } -#[tonic::async_trait] -impl Reporter for MockReporter { - async fn collect(&mut self, segments: LinkedList) -> Result<(), Box> { - self.segments.try_lock().unwrap().extend(segments); - Ok(()) +impl Report for MockReporter { + fn report(&self, item: CollectItem) { + let segment = match item { + CollectItem::Trace(segment) => segment, + _ => unreachable!(), + }; + self.segments.try_lock().unwrap().push_back(segment); } } From c622644b3e38a9e67ab208e10d2475e511ff341d Mon Sep 17 00:00:00 2001 From: jmjoy Date: Tue, 9 Aug 2022 18:05:40 +0800 Subject: [PATCH 2/6] Finish logging api. --- Cargo.toml | 4 + e2e/rust-toolchain.toml | 20 ---- examples/simple_log_report.rs | 45 +++++++++ examples/simple_trace_report.rs | 6 ++ src/logging/logger.rs | 10 +- src/logging/record.rs | 12 ++- src/reporter/grpc.rs | 36 ++++--- src/reporter/mod.rs | 13 +++ tests/logging.rs | 169 ++++++++++++++++++++++++++++++++ tests/propagation.rs | 1 - 10 files changed, 277 insertions(+), 39 deletions(-) delete mode 100644 e2e/rust-toolchain.toml create mode 100644 examples/simple_log_report.rs create mode 100644 tests/logging.rs diff --git a/Cargo.toml b/Cargo.toml index f46438e..498478e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,10 @@ tokio-stream = { version = "0.1.8", features = ["net"] } name = "trace_context" required-features = ["mock"] +[[test]] +name = "logging" +required-features = ["mock"] + [[example]] name = "simple_trace_report" path = "examples/simple_trace_report.rs" diff --git a/e2e/rust-toolchain.toml b/e2e/rust-toolchain.toml deleted file mode 100644 index 09cf6d7..0000000 --- a/e2e/rust-toolchain.toml +++ /dev/null @@ -1,20 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -[toolchain] -channel = "1.57.0" -components = ["rustfmt", "clippy"] diff --git a/examples/simple_log_report.rs b/examples/simple_log_report.rs new file mode 100644 index 0000000..476e70f --- /dev/null +++ b/examples/simple_log_report.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +use skywalking::{ + logging::{logger::Logger, record::LogRecord}, + reporter::grpc::GrpcReporter, +}; +use std::{error::Error, future, sync::Arc}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Connect to skywalking oap server. + let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?; + let reporter = Arc::new(reporter); + + // Do logging. + let logger = Logger::new("service", "instance", reporter.clone()); + logger.log(LogRecord::new().content("something to log")); + + // Start reporting and quit immediately when have completed the existing + // collection. + reporter + .reporting() + .await + .with_graceful_shutdown(future::ready(())) + .start() + .await?; + + Ok(()) +} diff --git a/examples/simple_trace_report.rs b/examples/simple_trace_report.rs index 26f0400..68f6e7d 100644 --- a/examples/simple_trace_report.rs +++ b/examples/simple_trace_report.rs @@ -48,7 +48,11 @@ async fn handle_request(tracer: Tracer) { #[tokio::main] async fn main() -> Result<(), Box> { + // Connect to skywalking oap server. let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?; + + // Spawn the reporting in background, with listening the graceful shutdown + // signal. let handle = reporter .reporting() .await @@ -57,9 +61,11 @@ async fn main() -> Result<(), Box> { }) .spawn(); + // Do tracing. let tracer = Tracer::new("service", "instance", reporter); handle_request(tracer).await; + // Wait the reporting to quit. handle.await?; Ok(()) diff --git a/src/logging/logger.rs b/src/logging/logger.rs index e0cad06..cb6029b 100644 --- a/src/logging/logger.rs +++ b/src/logging/logger.rs @@ -15,7 +15,7 @@ // use super::record::LogRecord; -use crate::reporter::{DynReport, Report}; +use crate::reporter::{CollectItem, DynReport, Report}; use std::sync::Arc; use tokio::sync::OnceCell; @@ -68,5 +68,11 @@ impl Logger { &self.inner.instance_name } - pub fn log(&self, record: LogRecord) {} + pub fn log(&self, record: LogRecord) { + let data = record.convert_to_log_data( + self.service_name().to_owned(), + self.instance_name().to_owned(), + ); + self.inner.reporter.report(CollectItem::Log(data)); + } } diff --git a/src/logging/record.rs b/src/logging/record.rs index 248f568..7c3cf6d 100644 --- a/src/logging/record.rs +++ b/src/logging/record.rs @@ -27,8 +27,6 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; -use super::logger::Logger; - pub enum RecordType { Text, Json, @@ -114,7 +112,11 @@ impl LogRecord { self } - pub(crate) fn convert_to_log_data(self, logger: &Logger) -> LogData { + pub(crate) fn convert_to_log_data( + self, + service_name: String, + instance_name: String, + ) -> LogData { let timestamp = if self.is_ignore_time { 0 } else { @@ -147,8 +149,8 @@ impl LogRecord { LogData { timestamp, - service: logger.service_name().to_owned(), - service_instance: logger.instance_name().to_owned(), + service: service_name, + service_instance: instance_name, endpoint: self.endpoint, body: Some(LogDataBody { r#type: "".to_owned(), diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index 008059a..c219277 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -108,9 +108,12 @@ struct Inner { is_closed: AtomicBool, } +pub type DynErrHandle = dyn Fn(Box) + Send + Sync + 'static; + #[derive(Clone)] pub struct GrpcReporter { inner: Arc>, + err_handle: Arc>>, } impl GrpcReporter, mpsc::UnboundedReceiver> { @@ -139,9 +142,18 @@ impl GrpcReporter { is_reporting: Default::default(), is_closed: Default::default(), }), + err_handle: Default::default(), } } + pub fn with_err_handle( + mut self, + handle: impl Fn(Box) + Send + Sync + 'static, + ) -> Self { + self.err_handle = Arc::new(Some(Box::new(handle))); + self + } + /// Start to reporting, quit when shutdown_signal received. /// /// Accept a `shutdown_signal` argument as a graceful shutdown signal. @@ -156,9 +168,7 @@ impl GrpcReporter { Reporting { rb: ReporterAndBuffer { - reporter: GrpcReporter { - inner: Arc::clone(&self.inner), - }, + inner: Arc::clone(&self.inner), trace_buffer: Default::default(), log_buffer: Default::default(), status_handle: None, @@ -172,13 +182,17 @@ impl GrpcReporter { impl Report for GrpcReporter { fn report(&self, item: CollectItem) { if !self.inner.is_closed.load(Ordering::Relaxed) { - let _ = self.inner.producer.produce(item); + if let Err(e) = self.inner.producer.produce(item) { + if let Some(handle) = self.err_handle.as_deref() { + handle(e); + } + } } } } struct ReporterAndBuffer { - reporter: GrpcReporter, + inner: Arc>, trace_buffer: LinkedList, log_buffer: LinkedList, status_handle: Option>, @@ -199,7 +213,6 @@ impl ReporterAndBuffer { if !self.trace_buffer.is_empty() { let buffer = take(&mut self.trace_buffer); if let Err(e) = self - .reporter .inner .trace_client .lock() @@ -215,7 +228,6 @@ impl ReporterAndBuffer { if !self.log_buffer.is_empty() { let buffer = take(&mut self.log_buffer); if let Err(e) = self - .reporter .inner .log_client .lock() @@ -231,7 +243,7 @@ impl ReporterAndBuffer { } } -/// Created by [Tracer::reporting]. +/// Created by [GrpcReporter::reporting]. pub struct Reporting { rb: ReporterAndBuffer, consumer: C, @@ -258,7 +270,7 @@ impl Reporting { } } - async fn start(self) -> crate::Result<()> { + pub async fn start(self) -> crate::Result<()> { let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel(); let Reporting { mut rb, @@ -282,7 +294,7 @@ impl Reporting { } } - rb.reporter.inner.is_closed.store(true, Ordering::Relaxed); + rb.inner.is_closed.store(true, Ordering::Relaxed); // Flush. loop { @@ -300,7 +312,9 @@ impl Reporting { let shutdown_fut = async move { shutdown_signal.await; - let _ = shutdown_tx.send(()); + shutdown_tx + .send(()) + .map_err(|e| crate::Error::Other(Box::new(e)))?; Ok(()) }; diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs index 653b16a..534647d 100644 --- a/src/reporter/mod.rs +++ b/src/reporter/mod.rs @@ -19,6 +19,7 @@ pub mod once_cell; pub mod print; use crate::skywalking_proto::v3::{LogData, SegmentObject}; +use std::{ops::Deref, sync::Arc}; #[derive(Debug)] #[non_exhaustive] @@ -38,3 +39,15 @@ pub trait Report { impl Report for () { fn report(&self, _item: CollectItem) {} } + +impl Report for Box { + fn report(&self, item: CollectItem) { + Report::report(self.deref(), item) + } +} + +impl Report for Arc { + fn report(&self, item: CollectItem) { + Report::report(self.deref(), item) + } +} diff --git a/tests/logging.rs b/tests/logging.rs new file mode 100644 index 0000000..e909bf6 --- /dev/null +++ b/tests/logging.rs @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use skywalking::{ + logging::{ + logger::Logger, + record::{LogRecord, RecordType}, + }, + reporter::{CollectItem, Report}, + skywalking_proto::v3::{ + log_data_body::Content, JsonLog, KeyStringValuePair, LogData, LogDataBody, LogTags, + TextLog, TraceContext, + }, + trace::tracer::Tracer, +}; +use std::{ + collections::LinkedList, + sync::{Arc, Mutex}, +}; + +#[test] +fn log() { + let reporter = Arc::new(MockReporter::default()); + let logger = Logger::new("service_name", "instance_name", reporter.clone()); + + { + logger.log(LogRecord::new()); + assert_eq!( + reporter.pop(), + LogData { + timestamp: 10, + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + body: Some(LogDataBody { + r#type: "".to_owned(), + content: Some(Content::Text(TextLog { + text: "".to_owned() + })) + }), + ..Default::default() + } + ); + } + + { + logger.log(LogRecord::new().ignore_time()); + assert_eq!( + reporter.pop(), + LogData { + timestamp: 0, + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + body: Some(LogDataBody { + r#type: "".to_owned(), + content: Some(Content::Text(TextLog { + text: "".to_owned() + })) + }), + ..Default::default() + } + ); + } + + { + logger.log( + LogRecord::new() + .endpoint("endpoint") + .add_tag("foo", "foo") + .add_tags([("bar", "bar")]) + .record_type(RecordType::Json) + .content(r#"{"content": "something to log"}"#), + ); + assert_eq!( + reporter.pop(), + LogData { + timestamp: 10, + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + endpoint: "endpoint".to_owned(), + tags: Some(LogTags { + data: vec![ + KeyStringValuePair { + key: "foo".to_owned(), + value: "foo".to_owned() + }, + KeyStringValuePair { + key: "bar".to_owned(), + value: "bar".to_owned() + }, + ], + }), + body: Some(LogDataBody { + r#type: "".to_owned(), + content: Some(Content::Json(JsonLog { + json: r#"{"content": "something to log"}"#.to_owned() + })) + }), + ..Default::default() + } + ); + } +} + +#[test] +fn integrate_trace() { + let reporter = Arc::new(MockReporter::default()); + let tracer = Tracer::new("service_name", "instance_name", reporter.clone()); + let logger = Logger::new("service_name", "instance_name", reporter.clone()); + + let mut ctx = tracer.create_trace_context(); + let span = ctx.create_entry_span("operation_name"); + + logger.log(LogRecord::new().with_tracing_context(&ctx).with_span(&span)); + assert_eq!( + reporter.pop(), + LogData { + timestamp: 10, + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + trace_context: Some(TraceContext { + trace_id: ctx.trace_id().to_owned(), + trace_segment_id: ctx.trace_segment_id().to_owned(), + span_id: span.span_id() + }), + body: Some(LogDataBody { + r#type: "".to_owned(), + content: Some(Content::Text(TextLog { + text: "".to_owned() + })) + }), + ..Default::default() + } + ); +} + +#[derive(Default, Clone)] +struct MockReporter { + items: Arc>>, +} + +impl MockReporter { + fn pop(&self) -> LogData { + self.items.try_lock().unwrap().pop_back().unwrap() + } +} + +impl Report for MockReporter { + fn report(&self, item: CollectItem) { + match item { + CollectItem::Log(data) => { + self.items.try_lock().unwrap().push_back(data); + } + _ => {} + } + } +} diff --git a/tests/propagation.rs b/tests/propagation.rs index 3353e95..99f6e20 100644 --- a/tests/propagation.rs +++ b/tests/propagation.rs @@ -14,7 +14,6 @@ // limitations under the License. // -#![allow(unused_imports)] use skywalking::{ reporter::print::PrintReporter, trace::{ From fe82a5fcb1b04ed0dccfdd91993567aec01c62e8 Mon Sep 17 00:00:00 2001 From: jmjoy Date: Thu, 11 Aug 2022 19:06:44 +0800 Subject: [PATCH 3/6] Add e2e for logging. --- docker-compose.e2e.yml | 2 +- e2e/data/expected_context.yaml | 49 ++++++++++++++++++++++++++++++++-- e2e/src/main.rs | 43 ++++++++++++++++++++++++----- src/logging/logger.rs | 5 ++++ src/reporter/grpc.rs | 10 ++++++- tests/propagation.rs | 6 +---- 6 files changed, 99 insertions(+), 16 deletions(-) diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml index a1f15fb..3462189 100644 --- a/docker-compose.e2e.yml +++ b/docker-compose.e2e.yml @@ -18,7 +18,7 @@ version: "3.9" services: collector: - image: ghcr.io/apache/skywalking-agent-test-tool/mock-collector:5acb890f225ca37ee60675ce3e330545e23e3cbc + image: ghcr.io/apache/skywalking-agent-test-tool/mock-collector:f4f5ef22b1df623464772816bb6b42ba611444ff ports: - "19876:19876" - "12800:12800" diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml index f4ff6e1..9778a2a 100644 --- a/e2e/data/expected_context.yaml +++ b/e2e/data/expected_context.yaml @@ -16,7 +16,7 @@ # under the License. # segmentItems: -- segmentSize: gt 1 +- segmentSize: ge 2 segments: - segmentId: not null spans: @@ -76,7 +76,7 @@ segmentItems: spanType: Entry startTime: gt 0 serviceName: producer -- segmentSize: 1 +- segmentSize: ge 1 segments: - segmentId: not null spans: @@ -101,3 +101,48 @@ segmentItems: spanType: Entry startTime: gt 0 serviceName: consumer + +logItems: +- logSize: ge 2 + logs: + - body: + content: + json: '{"message": "handle ping"}' + type: '' + endpoint: /ping + layer: '' + tags: + data: + - key: level + value: DEBUG + timestamp: gt 0 + - body: + content: + text: do http request + type: '' + endpoint: /ping + layer: '' + tags: + data: + - key: level + value: INFO + timestamp: gt 0 + traceContext: + spanId: 1 + traceId: not null + traceSegmentId: not null + serviceName: producer +- logSize: ge 1 + logs: + - body: + content: + json: '{"message": "handle pong"}' + type: '' + endpoint: /pong + layer: '' + tags: + data: + - key: level + value: DEBUG + timestamp: gt 0 + serviceName: consumer diff --git a/e2e/src/main.rs b/e2e/src/main.rs index b6e61c6..f3ffd57 100644 --- a/e2e/src/main.rs +++ b/e2e/src/main.rs @@ -21,6 +21,10 @@ use hyper::{ Body, Client, Method, Request, Response, Server, StatusCode, }; use skywalking::{ + logging::{ + logger::{self, Logger}, + record::{LogRecord, RecordType}, + }, reporter::grpc::GrpcReporter, trace::{ propagation::{ @@ -40,10 +44,18 @@ async fn handle_ping( _req: Request, client: Client, ) -> Result, Infallible> { + logger::log( + LogRecord::new() + .add_tag("level", "DEBUG") + .endpoint("/ping") + .record_type(RecordType::Json) + .content(r#"{"message": "handle ping"}"#), + ); + let mut context = tracer::create_trace_context(); let _span = context.create_entry_span("/ping"); { - let _span2 = context.create_exit_span("/pong", "consumer:8082"); + let span2 = context.create_exit_span("/pong", "consumer:8082"); let header = encode_propagation(&context, "/pong", "consumer:8082"); let req = Request::builder() .method(Method::GET) @@ -52,6 +64,15 @@ async fn handle_ping( .body(Body::from("")) .unwrap(); + logger::log( + LogRecord::new() + .add_tag("level", "INFO") + .endpoint("/ping") + .with_tracing_context(&context) + .with_span(&span2) + .record_type(RecordType::Text) + .content("do http request"), + ); client.request(req).await.unwrap(); } { @@ -106,6 +127,14 @@ async fn run_producer_service(host: [u8; 4]) { } async fn handle_pong(_req: Request) -> Result, Infallible> { + logger::log( + LogRecord::new() + .add_tag("level", "DEBUG") + .endpoint("/pong") + .record_type(RecordType::Json) + .content(r#"{"message": "handle pong"}"#), + ); + let ctx = decode_propagation( _req.headers()[SKYWALKING_HTTP_CONTEXT_HEADER_KEY] .to_str() @@ -156,17 +185,17 @@ async fn main() -> Result<(), Box> { let reporter = GrpcReporter::connect("http://collector:19876").await?; let handle = reporter.reporting().await.spawn(); - let handle = if opt.mode == "consumer" { - tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter)); + if opt.mode == "consumer" { + tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter.clone())); + logger::set_global_logger(Logger::new("consumer", "node_0", reporter)); run_consumer_service([0, 0, 0, 0]).await; - handle } else if opt.mode == "producer" { - tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter)); + tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter.clone())); + logger::set_global_logger(Logger::new("producer", "node_0", reporter)); run_producer_service([0, 0, 0, 0]).await; - handle } else { unreachable!() - }; + } handle.await?; diff --git a/src/logging/logger.rs b/src/logging/logger.rs index cb6029b..2571ea7 100644 --- a/src/logging/logger.rs +++ b/src/logging/logger.rs @@ -33,6 +33,11 @@ pub fn global_logger() -> &'static Logger { GLOBAL_LOGGER.get().expect("global logger haven't setted") } +/// Log by global logger. +pub fn log(record: LogRecord) { + global_logger().log(record); +} + pub struct Inner { service_name: String, instance_name: String, diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index c219277..eec3f73 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -110,7 +110,6 @@ struct Inner { pub type DynErrHandle = dyn Fn(Box) + Send + Sync + 'static; -#[derive(Clone)] pub struct GrpcReporter { inner: Arc>, err_handle: Arc>>, @@ -179,6 +178,15 @@ impl GrpcReporter { } } +impl Clone for GrpcReporter { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + err_handle: self.err_handle.clone(), + } + } +} + impl Report for GrpcReporter { fn report(&self, item: CollectItem) { if !self.inner.is_closed.load(Ordering::Relaxed) { diff --git a/tests/propagation.rs b/tests/propagation.rs index 99f6e20..684b60e 100644 --- a/tests/propagation.rs +++ b/tests/propagation.rs @@ -17,14 +17,10 @@ use skywalking::{ reporter::print::PrintReporter, trace::{ - propagation::{ - context::PropagationContext, decoder::decode_propagation, encoder::encode_propagation, - }, - trace_context::TracingContext, + propagation::{decoder::decode_propagation, encoder::encode_propagation}, tracer::Tracer, }, }; -use std::sync::Arc; #[test] fn basic() { From bec9b665ef6fca9e0f6ec4648b6f68ff7fb5a518 Mon Sep 17 00:00:00 2001 From: jmjoy Date: Thu, 11 Aug 2022 20:07:54 +0800 Subject: [PATCH 4/6] Fix clippy. --- src/reporter/grpc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index eec3f73..eb92517 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -295,7 +295,7 @@ impl Reporting { rb.report(item).await; } Ok(None) => break, - Err(err) => Err(crate::Error::Other(err))?, + Err(err) => return Err(crate::Error::Other(err)), } } _ = shutdown_rx.recv() => break, From 8eb4a881f7424242dcd5abe19b4d6055e435add6 Mon Sep 17 00:00:00 2001 From: jmjoy Date: Thu, 11 Aug 2022 20:24:11 +0800 Subject: [PATCH 5/6] Fix tests. --- src/logging/record.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/logging/record.rs b/src/logging/record.rs index 7c3cf6d..87f22cf 100644 --- a/src/logging/record.rs +++ b/src/logging/record.rs @@ -22,10 +22,7 @@ use crate::{ }, trace::{span::Span, trace_context::TracingContext}, }; -use std::{ - collections::HashMap, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::time::{SystemTime, UNIX_EPOCH}; pub enum RecordType { Text, @@ -44,7 +41,7 @@ pub struct LogRecord { time: Option, is_ignore_time: bool, endpoint: String, - tags: HashMap, + tags: Vec<(String, String)>, trace_id: Option, trace_segment_id: Option, span_id: Option, @@ -74,7 +71,7 @@ impl LogRecord { } pub fn add_tag(mut self, key: impl ToString, value: impl ToString) -> Self { - self.tags.insert(key.to_string(), value.to_string()); + self.tags.push((key.to_string(), value.to_string())); self } From 7edfd1ff7667ba761223549e73bc73234b8049cb Mon Sep 17 00:00:00 2001 From: jmjoy Date: Fri, 12 Aug 2022 09:45:45 +0800 Subject: [PATCH 6/6] Modify readme. --- README.md | 41 +++++++++++++++++++++++++++++------ examples/simple_log_report.rs | 3 +-- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 64d9c72..169b391 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,9 @@ and core concepts to keep best compatibility and performance. All concepts are from the official SkyWalking definitions. -## Span +## Tracing + +### Span Span is an important and common concept in distributed tracing system. Learn Span from Google Dapper Paper. For better performance, we extend the span into 3 kinds. @@ -34,19 +36,29 @@ Tag and Log are similar attributes of the span. - Log is heavier than tag, with one timestamp and multiple key:value pairs. Log represents an event, typically an error happens. -## TracingContext +### TracingContext TracingContext is the context of the tracing process. Span should only be created through context, and be archived into the context after the span finished. +## Logging + +### LogRecord + +LogRecord is the simple builder for the LogData, which is the Log format of Skywalking. + # Example ```rust, no_run -use skywalking::{reporter::grpc::GrpcReporter, trace::tracer::Tracer}; +use skywalking::{ + logging::{logger::Logger, record::{LogRecord, RecordType}}, + reporter::grpc::GrpcReporter, + trace::tracer::Tracer, +}; use std::error::Error; use tokio::signal; -async fn handle_request(tracer: Tracer) { +async fn handle_request(tracer: Tracer, logger: Logger) { let mut ctx = tracer.create_trace_context(); { @@ -59,10 +71,20 @@ async fn handle_request(tracer: Tracer) { { // Generates an Exit Span when executing an RPC. - let _span2 = ctx.create_exit_span("op2", "remote_peer"); + let span2 = ctx.create_exit_span("op2", "remote_peer"); // Something... + // Do logging. + logger.log( + LogRecord::new() + .add_tag("level", "INFO") + .with_tracing_context(&ctx) + .with_span(&span2) + .record_type(RecordType::Text) + .content("Something...") + ); + // Auto close span2 when dropped. } @@ -74,7 +96,10 @@ async fn handle_request(tracer: Tracer) { #[tokio::main] async fn main() -> Result<(), Box> { + // Connect to skywalking oap server. let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?; + + // Spawn the reporting in background, with listening the graceful shutdown signal. let handle = reporter .reporting() .await @@ -83,8 +108,10 @@ async fn main() -> Result<(), Box> { }) .spawn(); - let tracer = Tracer::new("service", "instance", reporter); - handle_request(tracer).await; + let tracer = Tracer::new("service", "instance", reporter.clone()); + let logger = Logger::new("service", "instance", reporter); + + handle_request(tracer, logger).await; handle.await?; diff --git a/examples/simple_log_report.rs b/examples/simple_log_report.rs index 476e70f..0466e9a 100644 --- a/examples/simple_log_report.rs +++ b/examples/simple_log_report.rs @@ -20,13 +20,12 @@ use skywalking::{ logging::{logger::Logger, record::LogRecord}, reporter::grpc::GrpcReporter, }; -use std::{error::Error, future, sync::Arc}; +use std::{error::Error, future}; #[tokio::main] async fn main() -> Result<(), Box> { // Connect to skywalking oap server. let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?; - let reporter = Arc::new(reporter); // Do logging. let logger = Logger::new("service", "instance", reporter.clone());