diff --git a/Cargo.toml b/Cargo.toml index 28a0dea98d..4ae28e5821 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,23 +23,25 @@ criterion = "0.5" futures-core = "0.3" futures-executor = "0.3" futures-util = { version = "0.3", default-features = false } -hyper = { version = "0.14", default-features = false } -http = { version = "0.2", default-features = false } +http = { version = "1.1", default-features = false, features = ["std"] } +http-body-util = "0.1" +hyper = { version = "1.3", default-features = false } +hyper-util = "0.1" log = "0.4.21" once_cell = "1.13" ordered-float = "4.0" pin-project-lite = "0.2" -prost = "0.12" -prost-build = "0.12" -prost-types = "0.12" +prost = "0.13" +prost-build = "0.13" +prost-types = "0.13" rand = { version = "0.8", default-features = false } -reqwest = { version = "0.11", default-features = false } +reqwest = { version = "0.12", default-features = false } serde = { version = "1.0", default-features = false } serde_json = "1.0" temp-env = "0.3.6" thiserror = { version = "1", default-features = false } -tonic = { version = "0.11", default-features = false } -tonic-build = "0.11" +tonic = { version = "0.12", default-features = false } +tonic-build = "0.12" tokio = { version = "1", default-features = false } tokio-stream = "0.1.1" tracing = { version = "0.1", default-features = false } diff --git a/examples/tracing-http-propagator/Cargo.toml b/examples/tracing-http-propagator/Cargo.toml index 0c019e14e9..7d13e666ed 100644 --- a/examples/tracing-http-propagator/Cargo.toml +++ b/examples/tracing-http-propagator/Cargo.toml @@ -16,7 +16,9 @@ path = "src/client.rs" doc = false [dependencies] +http-body-util = { workspace = true } hyper = { workspace = true, features = ["full"] } +hyper-util = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] } opentelemetry = { path = "../../opentelemetry" } opentelemetry_sdk = { path = "../../opentelemetry-sdk" } diff --git a/examples/tracing-http-propagator/src/client.rs b/examples/tracing-http-propagator/src/client.rs index 35e2530b4a..e0936fd46b 100644 --- a/examples/tracing-http-propagator/src/client.rs +++ b/examples/tracing-http-propagator/src/client.rs @@ -1,10 +1,11 @@ -use hyper::{body::Body, Client}; +use http_body_util::Full; +use hyper_util::{client::legacy::Client, rt::TokioExecutor}; use opentelemetry::{ global, trace::{SpanKind, TraceContextExt, Tracer}, Context, KeyValue, }; -use opentelemetry_http::HeaderInjector; +use opentelemetry_http::{Bytes, HeaderInjector}; use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider}; use opentelemetry_stdout::SpanExporter; @@ -24,7 +25,7 @@ async fn send_request( body_content: &str, span_name: &str, ) -> std::result::Result<(), Box> { - let client = Client::new(); + let client = Client::builder(TokioExecutor::new()).build_http(); let tracer = global::tracer("example/client"); let span = tracer .span_builder(String::from(span_name)) @@ -37,7 +38,7 @@ async fn send_request( propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap())) }); let res = client - .request(req.body(Body::from(String::from(body_content)))?) + .request(req.body(Full::new(Bytes::from(body_content.to_string())))?) .await?; cx.span().add_event( diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 1ad924c766..bbfa5556cb 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -1,50 +1,64 @@ -use hyper::{ - service::{make_service_fn, service_fn}, - Body, Request, Response, Server, StatusCode, -}; +use http_body_util::{combinators::BoxBody, BodyExt, Full}; +use hyper::{body::Incoming, service::service_fn, Request, Response, StatusCode}; +use hyper_util::rt::{TokioExecutor, TokioIo}; use opentelemetry::{ global, trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer}, Context, KeyValue, }; -use opentelemetry_http::HeaderExtractor; +use opentelemetry_http::{Bytes, HeaderExtractor}; use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider}; use opentelemetry_semantic_conventions::trace; use opentelemetry_stdout::SpanExporter; use std::{convert::Infallible, net::SocketAddr}; +use tokio::net::TcpListener; // Utility function to extract the context from the incoming request headers -fn extract_context_from_request(req: &Request) -> Context { +fn extract_context_from_request(req: &Request) -> Context { global::get_text_map_propagator(|propagator| { propagator.extract(&HeaderExtractor(req.headers())) }) } // Separate async function for the handle endpoint -async fn handle_health_check(_req: Request) -> Result, Infallible> { +async fn handle_health_check( + _req: Request, +) -> Result>, Infallible> { let tracer = global::tracer("example/server"); let mut span = tracer .span_builder("health_check") .with_kind(SpanKind::Internal) .start(&tracer); span.add_event("Health check accessed", vec![]); - let res = Response::new(Body::from("Server is up and running!")); + + let res = Response::new( + Full::new(Bytes::from_static(b"Server is up and running!")) + .map_err(|err| match err {}) + .boxed(), + ); + Ok(res) } // Separate async function for the echo endpoint -async fn handle_echo(req: Request) -> Result, Infallible> { +async fn handle_echo( + req: Request, +) -> Result>, Infallible> { let tracer = global::tracer("example/server"); let mut span = tracer .span_builder("echo") .with_kind(SpanKind::Internal) .start(&tracer); span.add_event("Echoing back the request", vec![]); - let res = Response::new(req.into_body()); + + let res = Response::new(req.into_body().boxed()); + Ok(res) } -async fn router(req: Request) -> Result, Infallible> { +async fn router( + req: Request, +) -> Result>, Infallible> { // Extract the context from the incoming request headers let parent_cx = extract_context_from_request(&req); let response = { @@ -64,12 +78,13 @@ async fn router(req: Request) -> Result, Infallible> { _ => { cx.span() .set_attribute(KeyValue::new(trace::HTTP_RESPONSE_STATUS_CODE, 404)); - let mut not_found = Response::default(); + let mut not_found = Response::new(BoxBody::default()); *not_found.status_mut() = StatusCode::NOT_FOUND; Ok(not_found) } } }; + response } @@ -87,15 +102,18 @@ fn init_tracer() { #[tokio::main] async fn main() { + use hyper_util::server::conn::auto::Builder; + init_tracer(); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + let listener = TcpListener::bind(addr).await.unwrap(); - let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(router)) }); - - let server = Server::bind(&addr).serve(make_svc); - - println!("Listening on {addr}"); - if let Err(e) = server.await { - eprintln!("server error: {e}"); + while let Ok((stream, _addr)) = listener.accept().await { + if let Err(err) = Builder::new(TokioExecutor::new()) + .serve_connection(TokioIo::new(stream), service_fn(router)) + .await + { + eprintln!("{err}"); + } } } diff --git a/opentelemetry-appender-tracing/Cargo.toml b/opentelemetry-appender-tracing/Cargo.toml index d104014662..0e6f0e249d 100644 --- a/opentelemetry-appender-tracing/Cargo.toml +++ b/opentelemetry-appender-tracing/Cargo.toml @@ -12,7 +12,6 @@ rust-version = "1.65" [dependencies] log = { workspace = true, optional = true } -once_cell = { workspace = true } opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["logs"] } tracing = { workspace = true, features = ["std"]} tracing-core = { workspace = true } diff --git a/opentelemetry-http/CHANGELOG.md b/opentelemetry-http/CHANGELOG.md index 51880f3c31..3603af5a5b 100644 --- a/opentelemetry-http/CHANGELOG.md +++ b/opentelemetry-http/CHANGELOG.md @@ -4,6 +4,7 @@ - **Breaking** Correct the misspelling of "webkpi" to "webpki" in features [#1842](https://github.com/open-telemetry/opentelemetry-rust/pull/1842) - **Breaking** Remove support for the `isahc` HTTP client [#1924](https://github.com/open-telemetry/opentelemetry-rust/pull/1924) +- Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) ## v0.12.0 diff --git a/opentelemetry-http/Cargo.toml b/opentelemetry-http/Cargo.toml index f7472054df..3e16fb2d16 100644 --- a/opentelemetry-http/Cargo.toml +++ b/opentelemetry-http/Cargo.toml @@ -10,6 +10,7 @@ edition = "2021" rust-version = "1.65" [features] +hyper = ["dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:tokio"] reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"] reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"] @@ -17,7 +18,9 @@ reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"] async-trait = { workspace = true } bytes = { workspace = true } http = { workspace = true } -hyper = { workspace = true, features = ["http2", "client", "tcp"], optional = true } +http-body-util = { workspace = true, optional = true } +hyper = { workspace = true, optional = true } +hyper-util = { workspace = true, features = ["client-legacy", "http2"], optional = true } opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["trace"] } reqwest = { workspace = true, features = ["blocking"], optional = true } tokio = { workspace = true, features = ["time"], optional = true } diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index b921a41d9c..f3e5e4f70a 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -105,21 +105,24 @@ pub mod hyper { use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response}; use http::HeaderValue; - use hyper::client::connect::Connect; - use hyper::Client; + use http_body_util::{BodyExt, Full}; + use hyper::body::{Body as HttpBody, Frame}; + use hyper_util::client::legacy::{connect::Connect, Client}; use std::fmt::Debug; + use std::pin::Pin; + use std::task::{self, Poll}; use std::time::Duration; use tokio::time; #[derive(Debug, Clone)] pub struct HyperClient { - inner: Client, + inner: Client, timeout: Duration, authorization: Option, } impl HyperClient { - pub fn new_with_timeout(inner: Client, timeout: Duration) -> Self { + pub fn new_with_timeout(inner: Client, timeout: Duration) -> Self { Self { inner, timeout, @@ -128,7 +131,7 @@ pub mod hyper { } pub fn new_with_timeout_and_authorization_header( - inner: Client, + inner: Client, timeout: Duration, authorization: HeaderValue, ) -> Self { @@ -147,7 +150,7 @@ pub mod hyper { { async fn send(&self, request: Request>) -> Result, HttpError> { let (parts, body) = request.into_parts(); - let mut request = Request::from_parts(parts, body.into()); + let mut request = Request::from_parts(parts, Body(Full::from(body))); if let Some(ref authorization) = self.authorization { request .headers_mut() @@ -155,14 +158,41 @@ pub mod hyper { } let mut response = time::timeout(self.timeout, self.inner.request(request)).await??; let headers = std::mem::take(response.headers_mut()); + let mut http_response = Response::builder() .status(response.status()) - .body(hyper::body::to_bytes(response.into_body()).await?)?; + .body(response.into_body().collect().await?.to_bytes())?; *http_response.headers_mut() = headers; Ok(http_response.error_for_status()?) } } + + pub struct Body(Full); + + impl HttpBody for Body { + type Data = Bytes; + type Error = Box; + + #[inline] + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>>> { + let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) }; + inner_body.poll_frame(cx).map_err(Into::into) + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.0.is_end_stream() + } + + #[inline] + fn size_hint(&self) -> hyper::body::SizeHint { + self.0.size_hint() + } + } } /// Methods to make working with responses from the [`HttpClient`] trait easier. diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index 5295ae9ee0..f901e1314b 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -20,7 +20,7 @@ now use `.with_resource(RESOURCE::default())` to configure Resource when using previous release. - **Breaking** [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) The OTLP logs exporter now overrides the [InstrumentationScope::name](https://github.com/open-telemetry/opentelemetry-proto/blob/b3060d2104df364136d75a35779e6bd48bac449a/opentelemetry/proto/common/v1/common.proto#L73) field with the `target` from `LogRecord`, if target is populated. - Groups batch of `LogRecord` and `Span` by their resource and instrumentation scope before exporting, for better efficiency [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873). - +- **Breaking** Update to `http` v1 and `tonic` v0.12 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) ## v0.16.0 diff --git a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml index 149a734304..ccbe22e960 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml @@ -8,7 +8,7 @@ publish = false [features] default = ["reqwest"] reqwest = ["opentelemetry-otlp/reqwest-client"] -hyper = ["dep:async-trait", "dep:http", "dep:hyper", "dep:opentelemetry-http", "dep:bytes"] +hyper = ["dep:async-trait", "dep:http", "dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:opentelemetry-http", "dep:bytes"] [dependencies] @@ -23,7 +23,9 @@ opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-c async-trait = { workspace = true, optional = true } bytes = { workspace = true, optional = true } http = { workspace = true, optional = true } +http-body-util = { workspace = true, optional = true } hyper = { workspace = true, features = ["client"], optional = true } +hyper-util = { workspace = true, features = ["client-legacy"], optional = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true, features = ["std"]} tracing-core = { workspace = true } diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs index ff6e84a05d..80a28ae62d 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs @@ -1,20 +1,24 @@ use async_trait::async_trait; use bytes::Bytes; use http::{Request, Response}; -use hyper::{ - client::{connect::Connect, HttpConnector}, - Body, Client, +use http_body_util::{BodyExt, Full}; +use hyper_util::{ + client::legacy::{ + connect::{Connect, HttpConnector}, + Client, + }, + rt::TokioExecutor, }; use opentelemetry_http::{HttpClient, HttpError, ResponseExt}; pub struct HyperClient { - inner: hyper::Client, + inner: hyper_util::client::legacy::Client>, } impl Default for HyperClient { fn default() -> Self { Self { - inner: Client::new(), + inner: Client::builder(TokioExecutor::new()).build_http(), } } } @@ -30,7 +34,7 @@ impl std::fmt::Debug for HyperClient { #[async_trait] impl HttpClient for HyperClient { async fn send(&self, request: Request>) -> Result, HttpError> { - let request = request.map(Body::from); + let request = request.map(|body| Full::new(Bytes::from(body))); let (parts, body) = self .inner @@ -38,7 +42,7 @@ impl HttpClient for HyperClient { .await? .error_for_status()? .into_parts(); - let body = hyper::body::to_bytes(body).await?; + let body = body.collect().await?.to_bytes(); Ok(Response::from_parts(parts, body)) } diff --git a/opentelemetry-otlp/tests/integration_test/Cargo.toml b/opentelemetry-otlp/tests/integration_test/Cargo.toml index 9566576d9b..d7ef8de3d3 100644 --- a/opentelemetry-otlp/tests/integration_test/Cargo.toml +++ b/opentelemetry-otlp/tests/integration_test/Cargo.toml @@ -6,7 +6,6 @@ publish = false [dependencies] -once_cell = { workspace = true } opentelemetry = { path = "../../../opentelemetry", features = ["metrics", "logs"] } opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "logs", "testing"] } opentelemetry-proto = { path = "../../../opentelemetry-proto", features = ["gen-tonic-messages", "trace", "logs", "with-serde"] } diff --git a/opentelemetry-prometheus/Cargo.toml b/opentelemetry-prometheus/Cargo.toml index 7aa488cfd6..97d9d7abf2 100644 --- a/opentelemetry-prometheus/Cargo.toml +++ b/opentelemetry-prometheus/Cargo.toml @@ -28,7 +28,9 @@ protobuf = "2.14" [dev-dependencies] opentelemetry-semantic-conventions = { version = "0.15" } +http-body-util = { workspace = true } hyper = { workspace = true, features = ["full"] } +hyper-util = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] } [features] diff --git a/opentelemetry-prometheus/examples/hyper.rs b/opentelemetry-prometheus/examples/hyper.rs index 943ba617b6..10d1402e98 100644 --- a/opentelemetry-prometheus/examples/hyper.rs +++ b/opentelemetry-prometheus/examples/hyper.rs @@ -1,8 +1,11 @@ +use http_body_util::Full; use hyper::{ + body::{Bytes, Incoming}, header::CONTENT_TYPE, - service::{make_service_fn, service_fn}, - Body, Method, Request, Response, Server, + service::service_fn, + Method, Request, Response, }; +use hyper_util::rt::{TokioExecutor, TokioIo}; use once_cell::sync::Lazy; use opentelemetry::{ metrics::{Counter, Histogram, MeterProvider as _, Unit}, @@ -10,16 +13,17 @@ use opentelemetry::{ }; use opentelemetry_sdk::metrics::SdkMeterProvider; use prometheus::{Encoder, Registry, TextEncoder}; -use std::convert::Infallible; +use std::net::SocketAddr; use std::sync::Arc; use std::time::SystemTime; +use tokio::net::TcpListener; static HANDLER_ALL: Lazy<[KeyValue; 1]> = Lazy::new(|| [KeyValue::new("handler", "all")]); async fn serve_req( - req: Request, + req: Request, state: Arc, -) -> Result, hyper::Error> { +) -> Result>, hyper::Error> { println!("Receiving request at path {}", req.uri()); let request_start = SystemTime::now(); @@ -38,16 +42,16 @@ async fn serve_req( Response::builder() .status(200) .header(CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) + .body(Full::new(Bytes::from(buffer))) .unwrap() } (&Method::GET, "/") => Response::builder() .status(200) - .body(Body::from("Hello World")) + .body(Full::new("Hello World".into())) .unwrap(), _ => Response::builder() .status(404) - .body(Body::from("Missing Page")) + .body(Full::new("Missing Page".into())) .unwrap(), }; @@ -67,6 +71,8 @@ struct AppState { #[tokio::main] pub async fn main() -> Result<(), Box> { + use hyper_util::server::conn::auto::Builder; + let registry = Registry::new(); let exporter = opentelemetry_prometheus::exporter() .with_registry(registry.clone()) @@ -92,23 +98,22 @@ pub async fn main() -> Result<(), Box> { .init(), }); - // For every connection, we must make a `Service` to handle all - // incoming HTTP requests on said connection. - let make_svc = make_service_fn(move |_conn| { - let state = state.clone(); - // This is the `Service` that will handle the connection. - // `service_fn` is a helper to convert a function that - // returns a Response into a `Service`. - async move { Ok::<_, Infallible>(service_fn(move |req| serve_req(req, state.clone()))) } - }); - - let addr = ([127, 0, 0, 1], 3000).into(); - - let server = Server::bind(&addr).serve(make_svc); + let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); + let listener = TcpListener::bind(addr).await.unwrap(); println!("Listening on http://{addr}"); - server.await?; + while let Ok((stream, _addr)) = listener.accept().await { + if let Err(err) = Builder::new(TokioExecutor::new()) + .serve_connection( + TokioIo::new(stream), + service_fn(|req| serve_req(req, state.clone())), + ) + .await + { + eprintln!("{err}"); + } + } Ok(()) } diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.logs.v1.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.logs.v1.rs index 5e808db812..a8f5443978 100644 --- a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.logs.v1.rs +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.logs.v1.rs @@ -207,19 +207,17 @@ pub mod logs_service_server { /// case logs are sent/received to/from multiple Applications). #[derive(Debug)] pub struct LogsServiceServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl LogsServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -282,7 +280,6 @@ pub mod logs_service_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/opentelemetry.proto.collector.logs.v1.LogsService/Export" => { #[allow(non_camel_case_types)] @@ -313,7 +310,6 @@ pub mod logs_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExportSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -335,8 +331,11 @@ pub mod logs_service_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -357,16 +356,6 @@ pub mod logs_service_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for LogsServiceServer { const NAME: &'static str = "opentelemetry.proto.collector.logs.v1.LogsService"; } diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.metrics.v1.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.metrics.v1.rs index bc0f036420..052cc810f7 100644 --- a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.metrics.v1.rs +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.metrics.v1.rs @@ -207,19 +207,17 @@ pub mod metrics_service_server { /// central collector. #[derive(Debug)] pub struct MetricsServiceServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl MetricsServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -282,7 +280,6 @@ pub mod metrics_service_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export" => { #[allow(non_camel_case_types)] @@ -313,7 +310,6 @@ pub mod metrics_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExportSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -335,8 +331,11 @@ pub mod metrics_service_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -357,16 +356,6 @@ pub mod metrics_service_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for MetricsServiceServer { const NAME: &'static str = "opentelemetry.proto.collector.metrics.v1.MetricsService"; } diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.trace.v1.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.trace.v1.rs index fae4d4dce6..55d0361be0 100644 --- a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.trace.v1.rs +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.trace.v1.rs @@ -207,19 +207,17 @@ pub mod trace_service_server { /// case spans are sent/received to/from multiple Applications). #[derive(Debug)] pub struct TraceServiceServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl TraceServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -282,7 +280,6 @@ pub mod trace_service_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/opentelemetry.proto.collector.trace.v1.TraceService/Export" => { #[allow(non_camel_case_types)] @@ -313,7 +310,6 @@ pub mod trace_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExportSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -335,8 +331,11 @@ pub mod trace_service_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -357,16 +356,6 @@ pub mod trace_service_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for TraceServiceServer { const NAME: &'static str = "opentelemetry.proto.collector.trace.v1.TraceService"; } diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs index 462305014f..1a69fcc5c5 100644 --- a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs @@ -335,7 +335,7 @@ pub mod number_data_point { #[cfg_attr(feature = "with-serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "with-serde", serde(rename_all = "camelCase"))] #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum Value { #[prost(double, tag = "4")] AsDouble(f64), @@ -624,7 +624,7 @@ pub mod summary_data_point { #[cfg_attr(feature = "with-serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "with-serde", serde(rename_all = "camelCase"))] #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] + #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ValueAtQuantile { /// The quantile of a distribution. Must be in the interval /// \[0.0, 1.0\]. @@ -685,7 +685,7 @@ pub mod exemplar { #[cfg_attr(feature = "with-serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "with-serde", serde(rename_all = "camelCase"))] #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum Value { #[prost(double, tag = "3")] AsDouble(f64), diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 88e303a9ac..16e5ece5df 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -74,6 +74,7 @@ LogData { } ``` The `LogRecord::target` field contains the actual target/component emitting the logs, while the `Instrumentation::name` contains the name of the OpenTelemetry appender. +- **Breaking** [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) Update to `http` v1 types (via `opentelemetry-http` update) ## v0.23.0 diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 9914c7b408..faadf473b3 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -312,7 +312,7 @@ mod tests { use opentelemetry::{Key, KeyValue, Value}; use std::fmt::{Debug, Formatter}; use std::sync::atomic::AtomicU64; - use std::sync::{Arc, Mutex}; + use std::sync::Mutex; use std::thread; struct ShutdownTestLogProcessor { diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 81df8d32f7..034053bdbc 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -291,7 +291,7 @@ impl PeriodicReaderWorker { true } - async fn run(mut self, mut messages: impl Unpin + FusedStream) { + async fn run(mut self, mut messages: impl FusedStream + Unpin) { while let Some(message) = messages.next().await { if !self.process_message(message).await { break; diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 214c0e5768..b99dc45311 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -422,7 +422,7 @@ impl BatchSpanProcessorInternal { }) } - async fn run(mut self, mut messages: impl Unpin + FusedStream) { + async fn run(mut self, mut messages: impl FusedStream + Unpin) { loop { select! { // FuturesUnordered implements Fuse intelligently such that it diff --git a/opentelemetry-zipkin/CHANGELOG.md b/opentelemetry-zipkin/CHANGELOG.md index 68ef3842c5..69d0018785 100644 --- a/opentelemetry-zipkin/CHANGELOG.md +++ b/opentelemetry-zipkin/CHANGELOG.md @@ -2,6 +2,10 @@ ## vNext +### Changed + +- Update `opentelemetry-http` (and with that to `http` v1 types) [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) + ## v0.21.0 ### Changed diff --git a/opentelemetry-zipkin/Cargo.toml b/opentelemetry-zipkin/Cargo.toml index b163db8088..768a165716 100644 --- a/opentelemetry-zipkin/Cargo.toml +++ b/opentelemetry-zipkin/Cargo.toml @@ -36,13 +36,14 @@ serde_json = { workspace = true } serde = { workspace = true, features = ["derive"] } typed-builder = "0.18" http = { workspace = true } -reqwest = { workspace = true, optional = true} +reqwest = { workspace = true, optional = true } thiserror = { workspace = true } futures-core = { workspace = true } [dev-dependencies] bytes = { workspace = true } futures-util = { workspace = true, features = ["io"] } -hyper = { workspace = true } +http-body-util = { workspace = true } +hyper-util = { workspace = true, features = ["client-legacy", "http1", "tokio"] } opentelemetry_sdk = { default-features = false, features = ["trace", "testing"], path = "../opentelemetry-sdk" } temp-env = { workspace = true } diff --git a/opentelemetry-zipkin/src/lib.rs b/opentelemetry-zipkin/src/lib.rs index d884ab740a..0e8db47dd3 100644 --- a/opentelemetry-zipkin/src/lib.rs +++ b/opentelemetry-zipkin/src/lib.rs @@ -95,28 +95,33 @@ //! use http::{Request, Response}; //! use std::convert::TryInto as _; //! use std::error::Error; -//! use hyper::{client::HttpConnector, Body}; +//! use http_body_util::{BodyExt, Full}; +//! use hyper_util::{ +//! client::legacy::{Client, connect::HttpConnector}, +//! rt::tokio::TokioExecutor, +//! }; //! //! // `reqwest` is supported through a feature, if you prefer an //! // alternate http client you can add support by implementing `HttpClient` as //! // shown here. //! #[derive(Debug)] -//! struct HyperClient(hyper::Client); +//! struct HyperClient(Client>); //! //! #[async_trait] //! impl HttpClient for HyperClient { //! async fn send(&self, req: Request>) -> Result, HttpError> { //! let resp = self //! .0 -//! .request(req.map(|v| Body::from(v))) +//! .request(req.map(|v| Full::new(Bytes::from(v)))) //! .await?; //! //! let response = Response::builder() //! .status(resp.status()) //! .body({ -//! hyper::body::to_bytes(resp.into_body()) +//! resp.collect() //! .await //! .expect("cannot decode response") +//! .to_bytes() //! }) //! .expect("cannot build response"); //! @@ -127,7 +132,12 @@ //! fn main() -> Result<(), Box> { //! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); //! let tracer = opentelemetry_zipkin::new_pipeline() -//! .with_http_client(HyperClient(hyper::Client::new())) +//! .with_http_client( +//! HyperClient( +//! Client::builder(TokioExecutor::new()) +//! .build_http() +//! ) +//! ) //! .with_service_name("my_app") //! .with_service_address("127.0.0.1:8080".parse()?) //! .with_collector_endpoint("http://localhost:9411/api/v2/spans") diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 43aa8adee7..d7e2c7a1c4 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -43,7 +43,6 @@ opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "log opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"} rand = { version = "0.8.4", features = ["small_rng"] } tracing = { workspace = true, features = ["std"]} -tracing-core = { workspace = true } tracing-subscriber = { workspace = true, features = ["registry", "std"] } num-format = "0.4.4" sysinfo = { version = "0.30.12", optional = true }