Skip to content

Commit

Permalink
gracefully shutdown reporter & add agent-test-tool based e2e case (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun authored Jan 14, 2022
1 parent 1465d9f commit f4743a9
Show file tree
Hide file tree
Showing 16 changed files with 156 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: test
name: codecov
on:
pull_request:
push:
Expand Down
25 changes: 25 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: agent-test-tool

on:
pull_request:
push:
branches:
- master
tags:
- 'v*'

jobs:
e2e-rust:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: recursive
- name: Prepare service container
run: docker-compose -f docker-compose.e2e.yml up --build -d
- name: Run e2e
run: |
pip3 install --upgrade pip
pip3 install setuptools
pip3 install -r requirements.txt
python3 tests/e2e/run_e2e.py --expected_file=tests/e2e/data/expected_context.yaml --max_retry_times=3 --target_path=/ping
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ async-stream = "0.3.2"
[build-dependencies]
tonic-build = "0.5.2"

[dev-dependencies]
tokio-stream = {version = "0.1", features = ["net"]}

[[example]]
name = "simple_trace_report"
path = "examples/simple_trace_report.rs"
35 changes: 22 additions & 13 deletions docker-compose.e2e.yml
Original file line number Diff line number Diff line change
@@ -1,30 +1,39 @@
version: "3.7"
version: "3.9"
services:
collector:
build:
context: .
dockerfile: ./tests/e2e/docker/Dockerfile.tool
image: ghcr.io/apache/skywalking-agent-test-tool/mock-collector:5acb890f225ca37ee60675ce3e330545e23e3cbc
ports:
- 19876:19876
- 12800:12800
- "19876:19876"
- "12800:12800"
healthcheck:
test: [ "CMD", "curl", "http://0.0.0.0:12800/healthCheck" ]
interval: 5s
timeout: 5s

consumer:
build:
context: .
dockerfile: ./tests/e2e/docker/Dockerfile
expose:
expose:
- 8082
command: cargo run -- --mode consumer
command: --mode consumer
depends_on:
- collector
collector:
condition: service_healthy
healthcheck:
test: [ "CMD", "curl", "http://0.0.0.0:8082/healthCheck" ]
interval: 5s
timeout: 5s

producer:
build:
context: .
dockerfile: ./tests/e2e/docker/Dockerfile
ports:
- 8081:8081
command: cargo run -- --mode producer
- "8081:8081"
command: --mode producer
depends_on:
- collector
- consumer
collector:
condition: service_healthy
consumer:
condition: service_healthy
11 changes: 7 additions & 4 deletions examples/simple_trace_report.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::error::Error;

use skywalking_rust::context::trace_context::TracingContext;
use skywalking_rust::reporter::grpc::Reporter;
use tokio;

#[tokio::main]
async fn main() {
let tx = Reporter::start("http://0.0.0.0:11800").await;
async fn main() -> Result<(), Box<dyn Error>> {
let reporter = Reporter::start("http://0.0.0.0:11800").await;
let mut context = TracingContext::default("service", "instance");
{
let span = context.create_entry_span("op1").unwrap();
context.finalize_span(span);
}
let _ = tx.send(context).await;
reporter.sender().send(context).await?;
reporter.shutdown().await?;
Ok(())
}
3 changes: 3 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[toolchain]
channel = "1.57.0"
components = ["rustfmt", "clippy"]
2 changes: 1 addition & 1 deletion src/context/propagation/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn encode_propagation(context: &TracingContext, endpoint: &str, address: &st
res += "1-";
res += format!("{}-", encode(context.trace_id.to_string())).as_str();
res += format!("{}-", encode(context.trace_segment_id.to_string())).as_str();
res += format!("{}-", context.next_span_id.to_string()).as_str();
res += format!("{}-", context.next_span_id).as_str();
res += format!("{}-", encode(context.service.as_str())).as_str();
res += format!("{}-", encode(context.service_instance.as_str())).as_str();
res += format!("{}-", encode(endpoint)).as_str();
Expand Down
22 changes: 22 additions & 0 deletions src/context/trace_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::skywalking_proto::v3::{
KeyStringValuePair, Log, RefType, SegmentObject, SegmentReference, SpanLayer, SpanObject,
SpanType,
};
use std::fmt::Formatter;
use std::sync::Arc;

use super::system_time::UnixTimeStampFetcher;
Expand Down Expand Up @@ -61,6 +62,14 @@ pub struct Span {
time_fetcher: Arc<dyn TimeFetcher + Sync + Send>,
}

impl std::fmt::Debug for Span {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Span")
.field("span_internal", &self.span_internal)
.finish()
}
}

static SKYWALKING_RUST_COMPONENT_ID: i32 = 11000;

impl Span {
Expand Down Expand Up @@ -148,6 +157,19 @@ pub struct TracingContext {
segment_link: Option<PropagationContext>,
}

impl std::fmt::Debug for TracingContext {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TracingContext")
.field("trace_id", &self.trace_id)
.field("trace_segment_id", &self.trace_segment_id)
.field("service", &self.service)
.field("service_instance", &self.service_instance)
.field("next_span_id", &self.next_span_id)
.field("spans", &self.spans)
.finish()
}
}

impl TracingContext {
/// Generate a new trace context. Typically called when no context has
/// been propagated and a new trace is to be started.
Expand Down
49 changes: 40 additions & 9 deletions src/reporter/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use crate::context::trace_context::TracingContext;
use crate::skywalking_proto::v3::trace_segment_report_service_client::TraceSegmentReportServiceClient;
use crate::skywalking_proto::v3::SegmentObject;
use std::error::Error;
use tokio::sync::mpsc;
use tonic::transport::Channel;

Expand All @@ -32,12 +33,13 @@ async fn flush(client: &mut ReporterClient, context: SegmentObject) -> Result<()
}
}

pub struct Reporter {}
pub struct Reporter {
tx: mpsc::Sender<TracingContext>,
shutdown_tx: mpsc::Sender<()>,
}

static CHANNEL_BUF_SIZE: usize = 1024;

pub type ContextReporter = mpsc::Sender<TracingContext>;

impl Reporter {
/// Open gRPC client stream to send collected trace context.
/// This function generates a new async task which watch to arrive new trace context.
Expand All @@ -51,23 +53,52 @@ impl Reporter {
/// use skywalking_rust::reporter::grpc::Reporter;
///
/// #[tokio::main]
/// async fn main (){
/// let tx = Reporter::start("localhost:12800").await;
/// async fn main () -> Result<(), Box<dyn Error>> {
/// let reporter = Reporter::start("localhost:12800").await;
/// let mut context = TracingContext::default("service", "instance");
/// tx.send(context).await;
/// reporter.sender().send(context).await?;
/// reporter.shutdown().await?;
/// Ok(())
/// }
/// ```
pub async fn start(address: &str) -> ContextReporter {
pub async fn start(address: impl Into<String>) -> Self {
let (tx, mut rx): (mpsc::Sender<TracingContext>, mpsc::Receiver<TracingContext>) =
mpsc::channel(CHANNEL_BUF_SIZE);
let mut reporter = ReporterClient::connect(address.to_string()).await.unwrap();
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);

let mut reporter = ReporterClient::connect(address.into()).await.unwrap();
tokio::spawn(async move {
loop {
tokio::select! {
message = rx.recv() => {
if let Some(message) = message {
flush(&mut reporter, message.convert_segment_object()).await.unwrap();
} else {
break;
}
},
_ = shutdown_rx.recv() => {
break;
}
}
}
rx.close();
while let Some(message) = rx.recv().await {
flush(&mut reporter, message.convert_segment_object())
.await
.unwrap();
}
});
tx
Self { tx, shutdown_tx }
}

pub async fn shutdown(self) -> Result<(), Box<dyn Error>> {
self.shutdown_tx.send(()).await?;
self.shutdown_tx.closed().await;
Ok(())
}

pub fn sender(&self) -> mpsc::Sender<TracingContext> {
self.tx.clone()
}
}
4 changes: 3 additions & 1 deletion tests/e2e/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions tests/e2e/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
name = "e2e"
version = "0.1.0"
authors = ["Shikugawa <Shikugawa@gmail.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
edition = "2021"

[dependencies]
skywalking_rust = { path = "../../" }
skywalking_rust = { path = "../.." }
hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
structopt = "0.3"
10 changes: 5 additions & 5 deletions tests/e2e/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM rust:1.50.0
RUN apt update && apt install -y protobuf-compiler
RUN rustup component add rustfmt
COPY . /tmp
WORKDIR tmp/tests/e2e
FROM rust:1.57 as build
WORKDIR /build
COPY . /build/
RUN cd tests/e2e && cargo build --release --locked
ENTRYPOINT ["/build/tests/e2e/target/release/e2e"]
15 changes: 0 additions & 15 deletions tests/e2e/docker/Dockerfile.tool

This file was deleted.

3 changes: 3 additions & 0 deletions tests/e2e/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[toolchain]
channel = "1.57.0"
components = ["rustfmt", "clippy"]
Loading

0 comments on commit f4743a9

Please sign in to comment.