diff --git a/.github/workflows/test.yaml b/.github/workflows/codecov.yaml similarity index 96% rename from .github/workflows/test.yaml rename to .github/workflows/codecov.yaml index 91b5a22..2a8a124 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/codecov.yaml @@ -1,4 +1,4 @@ -name: test +name: codecov on: pull_request: push: diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml new file mode 100644 index 0000000..a581102 --- /dev/null +++ b/.github/workflows/e2e.yml @@ -0,0 +1,25 @@ +name: agent-test-tool + +on: + pull_request: + push: + branches: + - master + tags: + - 'v*' + +jobs: + e2e-rust: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + submodules: recursive + - name: Prepare service container + run: docker-compose -f docker-compose.e2e.yml up --build -d + - name: Run e2e + run: | + pip3 install --upgrade pip + pip3 install setuptools + pip3 install -r requirements.txt + python3 tests/e2e/run_e2e.py --expected_file=tests/e2e/data/expected_context.yaml --max_retry_times=3 --target_path=/ping diff --git a/Cargo.lock b/Cargo.lock index 5256ab4..5bfa1aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -612,6 +612,7 @@ dependencies = [ "prost", "prost-derive", "tokio", + "tokio-stream", "tonic", "tonic-build", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 0840f12..db82830 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,9 @@ async-stream = "0.3.2" [build-dependencies] tonic-build = "0.5.2" +[dev-dependencies] +tokio-stream = {version = "0.1", features = ["net"]} + [[example]] name = "simple_trace_report" path = "examples/simple_trace_report.rs" diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml index 3c28ead..875fbaf 100644 --- a/docker-compose.e2e.yml +++ b/docker-compose.e2e.yml @@ -1,30 +1,39 @@ -version: "3.7" +version: "3.9" services: collector: - build: - context: . - dockerfile: ./tests/e2e/docker/Dockerfile.tool + image: ghcr.io/apache/skywalking-agent-test-tool/mock-collector:5acb890f225ca37ee60675ce3e330545e23e3cbc ports: - - 19876:19876 - - 12800:12800 + - "19876:19876" + - "12800:12800" + healthcheck: + test: [ "CMD", "curl", "http://0.0.0.0:12800/healthCheck" ] + interval: 5s + timeout: 5s consumer: build: context: . dockerfile: ./tests/e2e/docker/Dockerfile - expose: + expose: - 8082 - command: cargo run -- --mode consumer + command: --mode consumer depends_on: - - collector + collector: + condition: service_healthy + healthcheck: + test: [ "CMD", "curl", "http://0.0.0.0:8082/healthCheck" ] + interval: 5s + timeout: 5s producer: build: context: . dockerfile: ./tests/e2e/docker/Dockerfile ports: - - 8081:8081 - command: cargo run -- --mode producer + - "8081:8081" + command: --mode producer depends_on: - - collector - - consumer \ No newline at end of file + collector: + condition: service_healthy + consumer: + condition: service_healthy diff --git a/examples/simple_trace_report.rs b/examples/simple_trace_report.rs index 6bcb39c..192b620 100644 --- a/examples/simple_trace_report.rs +++ b/examples/simple_trace_report.rs @@ -1,14 +1,17 @@ +use std::error::Error; + use skywalking_rust::context::trace_context::TracingContext; use skywalking_rust::reporter::grpc::Reporter; -use tokio; #[tokio::main] -async fn main() { - let tx = Reporter::start("http://0.0.0.0:11800").await; +async fn main() -> Result<(), Box> { + let reporter = Reporter::start("http://0.0.0.0:11800").await; let mut context = TracingContext::default("service", "instance"); { let span = context.create_entry_span("op1").unwrap(); context.finalize_span(span); } - let _ = tx.send(context).await; + reporter.sender().send(context).await?; + reporter.shutdown().await?; + Ok(()) } diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..98d86f6 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.57.0" +components = ["rustfmt", "clippy"] diff --git a/src/context/propagation/encoder.rs b/src/context/propagation/encoder.rs index 937e5f7..f928ece 100644 --- a/src/context/propagation/encoder.rs +++ b/src/context/propagation/encoder.rs @@ -25,7 +25,7 @@ pub fn encode_propagation(context: &TracingContext, endpoint: &str, address: &st res += "1-"; res += format!("{}-", encode(context.trace_id.to_string())).as_str(); res += format!("{}-", encode(context.trace_segment_id.to_string())).as_str(); - res += format!("{}-", context.next_span_id.to_string()).as_str(); + res += format!("{}-", context.next_span_id).as_str(); res += format!("{}-", encode(context.service.as_str())).as_str(); res += format!("{}-", encode(context.service_instance.as_str())).as_str(); res += format!("{}-", encode(endpoint)).as_str(); diff --git a/src/context/trace_context.rs b/src/context/trace_context.rs index cea1725..d341bf9 100644 --- a/src/context/trace_context.rs +++ b/src/context/trace_context.rs @@ -21,6 +21,7 @@ use crate::skywalking_proto::v3::{ KeyStringValuePair, Log, RefType, SegmentObject, SegmentReference, SpanLayer, SpanObject, SpanType, }; +use std::fmt::Formatter; use std::sync::Arc; use super::system_time::UnixTimeStampFetcher; @@ -61,6 +62,14 @@ pub struct Span { time_fetcher: Arc, } +impl std::fmt::Debug for Span { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Span") + .field("span_internal", &self.span_internal) + .finish() + } +} + static SKYWALKING_RUST_COMPONENT_ID: i32 = 11000; impl Span { @@ -148,6 +157,19 @@ pub struct TracingContext { segment_link: Option, } +impl std::fmt::Debug for TracingContext { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TracingContext") + .field("trace_id", &self.trace_id) + .field("trace_segment_id", &self.trace_segment_id) + .field("service", &self.service) + .field("service_instance", &self.service_instance) + .field("next_span_id", &self.next_span_id) + .field("spans", &self.spans) + .finish() + } +} + impl TracingContext { /// Generate a new trace context. Typically called when no context has /// been propagated and a new trace is to be started. diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index c2b7a76..4b0c690 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -17,6 +17,7 @@ use crate::context::trace_context::TracingContext; use crate::skywalking_proto::v3::trace_segment_report_service_client::TraceSegmentReportServiceClient; use crate::skywalking_proto::v3::SegmentObject; +use std::error::Error; use tokio::sync::mpsc; use tonic::transport::Channel; @@ -32,12 +33,13 @@ async fn flush(client: &mut ReporterClient, context: SegmentObject) -> Result<() } } -pub struct Reporter {} +pub struct Reporter { + tx: mpsc::Sender, + shutdown_tx: mpsc::Sender<()>, +} static CHANNEL_BUF_SIZE: usize = 1024; -pub type ContextReporter = mpsc::Sender; - impl Reporter { /// Open gRPC client stream to send collected trace context. /// This function generates a new async task which watch to arrive new trace context. @@ -51,23 +53,52 @@ impl Reporter { /// use skywalking_rust::reporter::grpc::Reporter; /// /// #[tokio::main] - /// async fn main (){ - /// let tx = Reporter::start("localhost:12800").await; + /// async fn main () -> Result<(), Box> { + /// let reporter = Reporter::start("localhost:12800").await; /// let mut context = TracingContext::default("service", "instance"); - /// tx.send(context).await; + /// reporter.sender().send(context).await?; + /// reporter.shutdown().await?; + /// Ok(()) /// } /// ``` - pub async fn start(address: &str) -> ContextReporter { + pub async fn start(address: impl Into) -> Self { let (tx, mut rx): (mpsc::Sender, mpsc::Receiver) = mpsc::channel(CHANNEL_BUF_SIZE); - let mut reporter = ReporterClient::connect(address.to_string()).await.unwrap(); + let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + + let mut reporter = ReporterClient::connect(address.into()).await.unwrap(); tokio::spawn(async move { + loop { + tokio::select! { + message = rx.recv() => { + if let Some(message) = message { + flush(&mut reporter, message.convert_segment_object()).await.unwrap(); + } else { + break; + } + }, + _ = shutdown_rx.recv() => { + break; + } + } + } + rx.close(); while let Some(message) = rx.recv().await { flush(&mut reporter, message.convert_segment_object()) .await .unwrap(); } }); - tx + Self { tx, shutdown_tx } + } + + pub async fn shutdown(self) -> Result<(), Box> { + self.shutdown_tx.send(()).await?; + self.shutdown_tx.closed().await; + Ok(()) + } + + pub fn sender(&self) -> mpsc::Sender { + self.tx.clone() } } diff --git a/tests/e2e/Cargo.lock b/tests/e2e/Cargo.lock index 197c8a4..baccba5 100644 --- a/tests/e2e/Cargo.lock +++ b/tests/e2e/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "ansi_term" version = "0.12.1" @@ -671,7 +673,7 @@ dependencies = [ [[package]] name = "skywalking_rust" -version = "0.0.1" +version = "0.1.0" dependencies = [ "async-stream", "base64", diff --git a/tests/e2e/Cargo.toml b/tests/e2e/Cargo.toml index 6a5cf5b..0e3c003 100644 --- a/tests/e2e/Cargo.toml +++ b/tests/e2e/Cargo.toml @@ -2,12 +2,10 @@ name = "e2e" version = "0.1.0" authors = ["Shikugawa "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +edition = "2021" [dependencies] -skywalking_rust = { path = "../../" } +skywalking_rust = { path = "../.." } hyper = { version = "0.14", features = ["full"] } tokio = { version = "1", features = ["full"] } structopt = "0.3" diff --git a/tests/e2e/docker/Dockerfile b/tests/e2e/docker/Dockerfile index 2ec7258..ebce92c 100644 --- a/tests/e2e/docker/Dockerfile +++ b/tests/e2e/docker/Dockerfile @@ -1,5 +1,5 @@ -FROM rust:1.50.0 -RUN apt update && apt install -y protobuf-compiler -RUN rustup component add rustfmt -COPY . /tmp -WORKDIR tmp/tests/e2e +FROM rust:1.57 as build +WORKDIR /build +COPY . /build/ +RUN cd tests/e2e && cargo build --release --locked +ENTRYPOINT ["/build/tests/e2e/target/release/e2e"] diff --git a/tests/e2e/docker/Dockerfile.tool b/tests/e2e/docker/Dockerfile.tool deleted file mode 100644 index 74ef01c..0000000 --- a/tests/e2e/docker/Dockerfile.tool +++ /dev/null @@ -1,15 +0,0 @@ -FROM openjdk:8 -WORKDIR /tests -ARG COMMIT_HASH=8db606f3470cce75c1b013ae498ac93b862b75b7 -ADD https://github.com/apache/skywalking-agent-test-tool/archive/${COMMIT_HASH}.tar.gz . -RUN tar -xf ${COMMIT_HASH}.tar.gz --strip 1 -RUN rm ${COMMIT_HASH}.tar.gz -RUN ./mvnw -B -DskipTests package - -FROM openjdk:8 -EXPOSE 19876 12800 -WORKDIR /tests -COPY --from=0 /tests/dist/skywalking-mock-collector.tar.gz /tests -RUN tar -xf skywalking-mock-collector.tar.gz --strip 1 -RUN chmod +x bin/collector-startup.sh -ENTRYPOINT bin/collector-startup.sh diff --git a/tests/e2e/rust-toolchain.toml b/tests/e2e/rust-toolchain.toml new file mode 100644 index 0000000..98d86f6 --- /dev/null +++ b/tests/e2e/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.57.0" +components = ["rustfmt", "clippy"] diff --git a/tests/e2e/src/main.rs b/tests/e2e/src/main.rs index cc9f76c..6d30358 100644 --- a/tests/e2e/src/main.rs +++ b/tests/e2e/src/main.rs @@ -7,11 +7,13 @@ use skywalking_rust::context::propagation::encoder::encode_propagation; use skywalking_rust::context::trace_context::TracingContext; use skywalking_rust::reporter::grpc::Reporter; use std::convert::Infallible; +use std::error::Error; use std::net::SocketAddr; use structopt::StructOpt; use tokio::sync::mpsc; static NOT_FOUND_MSG: &str = "not found"; +static SUCCESS_MSG: &str = "Success"; async fn handle_ping( _req: Request, @@ -45,6 +47,10 @@ async fn producer_response( ) -> Result, Infallible> { match (_req.method(), _req.uri().path()) { (&Method::GET, "/ping") => handle_ping(_req, client, tx).await, + (&Method::GET, "/healthCheck") => Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(SUCCESS_MSG)) + .unwrap()), _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from(NOT_FOUND_MSG)) @@ -66,7 +72,7 @@ async fn run_producer_service(host: [u8; 4], tx: mpsc::Sender) { }); let addr = SocketAddr::from((host, 8081)); let server = Server::bind(&addr).serve(make_svc); - + println!("starting producer on {:?}...", &addr); if let Err(e) = server.await { eprintln!("server error: {}", e); } @@ -95,6 +101,10 @@ async fn consumer_response( ) -> Result, Infallible> { match (_req.method(), _req.uri().path()) { (&Method::GET, "/pong") => handle_pong(_req, tx).await, + (&Method::GET, "/healthCheck") => Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(SUCCESS_MSG)) + .unwrap()), _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from(NOT_FOUND_MSG)) @@ -110,6 +120,7 @@ async fn run_consumer_service(host: [u8; 4], tx: mpsc::Sender) { let addr = SocketAddr::from((host, 8082)); let server = Server::bind(&addr).serve(make_svc); + println!("starting consumer on {:?}...", &addr); if let Err(e) = server.await { eprintln!("server error: {}", e); } @@ -123,13 +134,17 @@ struct Opt { } #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { let opt = Opt::from_args(); - let tx = Reporter::start("http://collector:19876").await; + let reporter = Reporter::start("http://collector:19876").await; + let tx = reporter.sender(); if opt.mode == "consumer" { run_consumer_service([0, 0, 0, 0], tx).await; } else if opt.mode == "producer" { run_producer_service([0, 0, 0, 0], tx).await; } + + reporter.shutdown().await?; + Ok(()) }