diff --git a/opentelemetry-sdk/src/runtime.rs b/opentelemetry-sdk/src/runtime.rs index 7705c10e91..28889bb5bd 100644 --- a/opentelemetry-sdk/src/runtime.rs +++ b/opentelemetry-sdk/src/runtime.rs @@ -7,8 +7,21 @@ //! [async-std]: https://crates.io/crates/async-std use futures_util::{future::BoxFuture, stream::Stream}; -use std::{fmt::Debug, future::Future, time::Duration}; +use std::{fmt::Debug, future::Future}; use thiserror::Error; +use std::{ + sync::{ Arc, Mutex}, + thread, + time::Duration, +}; +use std::pin::Pin; + +use futures_util::task::{Context, Poll}; +//use std::{future::Future, time::Duration}; +use futures_executor; +use crossbeam_channel::{self, Sender as CrossbeamSender, Receiver as CrossbeamReceiver}; +//use std::task::{Context, Poll}; + /// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows /// OpenTelemetry to work with any current and hopefully future runtime implementation. @@ -43,6 +56,7 @@ pub trait Runtime: Clone + Send + Sync + 'static { fn delay(&self, duration: Duration) -> Self::Delay; } + /// Runtime implementation, which works with Tokio's multi thread runtime. #[cfg(feature = "rt-tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] @@ -140,12 +154,12 @@ impl Runtime for AsyncStd { /// [span]: crate::trace::BatchSpanProcessor pub trait RuntimeChannel: Runtime { /// A future stream to receive batch messages from channels. - type Receiver: Stream + Send; + type Receiver: Stream + Send; /// A batch messages sender that can be sent across threads safely. - type Sender: TrySend + Debug; + type Sender: TrySend + Debug; /// Return the sender and receiver used to send batch messages. - fn batch_message_channel( + fn batch_message_channel( &self, capacity: usize, ) -> (Self::Sender, Self::Receiver); @@ -249,3 +263,146 @@ impl RuntimeChannel for AsyncStd { async_std::channel::bounded(capacity) } } + +/// stdthreadtuntime +#[derive(Debug, Clone)] +pub struct StdThreadRuntime { + shutdown_signal: Arc>, +} + +impl StdThreadRuntime { + /// new + pub fn new() -> Self { + StdThreadRuntime { + shutdown_signal: Arc::new(Mutex::new(false)), + } + } +} + +impl Runtime for StdThreadRuntime { + type Interval = StdInterval; + type Delay = StdDelay; + + fn interval(&self, duration: Duration) -> Self::Interval { + StdInterval::new(duration, self.shutdown_signal.clone()) + } + + fn spawn(&self, future: BoxFuture<'static, ()>) { + thread::spawn(move || { + futures_executor::block_on(future); + }); + } + + fn delay(&self, duration: Duration) -> Self::Delay { + StdDelay::new(duration) + } +} + +/// stdinterval +#[derive(Debug)] +pub struct StdInterval { + duration: Duration, + shutdown_signal: Arc>, +} + +impl StdInterval { + /// new + pub fn new(duration: Duration, shutdown_signal: Arc>) -> Self { + StdInterval { duration, shutdown_signal } + } +} + +impl Stream for StdInterval { + type Item = (); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if *self.shutdown_signal.lock().unwrap() { + Poll::Ready(None) + } else { + thread::sleep(self.duration); + cx.waker().wake_by_ref(); + Poll::Ready(Some(())) + } + } +} + +/// stddelay +#[derive(Debug)] +pub struct StdDelay { + duration: Duration, + elapsed: bool, +} + +impl StdDelay { + /// new + pub fn new(duration: Duration) -> Self { + StdDelay { duration, elapsed: false } + } +} + +impl Future for StdDelay { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.elapsed { + Poll::Ready(()) + } else { + self.elapsed = true; + cx.waker().wake_by_ref(); + thread::sleep(self.duration); + Poll::Pending + } + } +} + + +impl RuntimeChannel for StdThreadRuntime { + type Receiver = CrossbeamReceiverStream; + type Sender = Arc>; + + fn batch_message_channel( + &self, + capacity: usize, + ) -> (Self::Sender, Self::Receiver) { + let (sender, receiver) = crossbeam_channel::bounded(capacity); + (Arc::new(sender), CrossbeamReceiverStream::new(receiver)) + } +} + +/// crossbeamreceiverstream +#[derive(Debug)] +pub struct CrossbeamReceiverStream { + receiver: CrossbeamReceiver, +} + +impl CrossbeamReceiverStream { + /// new + pub fn new(receiver: CrossbeamReceiver) -> Self { + CrossbeamReceiverStream { receiver } + } +} + +impl Stream for CrossbeamReceiverStream { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.receiver.try_recv() { + Ok(item) => Poll::Ready(Some(item)), + Err(crossbeam_channel::TryRecvError::Empty) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + Err(crossbeam_channel::TryRecvError::Disconnected) => Poll::Ready(None), + } + } +} + +impl TrySend for Arc> { + type Message = T; + + fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> { + self.send(item).map_err(|_| TrySendError::ChannelClosed) + } +} + +