Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gracefully shutdown reporter & add agent-test-tool based e2e case #9

Merged
merged 10 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: test
name: codecov
on:
pull_request:
push:
Expand Down
25 changes: 25 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: agent-test-tool

on:
pull_request:
push:
branches:
- master
tags:
- 'v*'

jobs:
e2e-rust:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: recursive
- name: Prepare service container
run: docker-compose -f docker-compose.e2e.yml up --build -d
- name: Run e2e
run: |
pip3 install --upgrade pip
pip3 install setuptools
pip3 install -r requirements.txt
python3 tests/e2e/run_e2e.py --expected_file=tests/e2e/data/expected_context.yaml --max_retry_times=3 --target_path=/ping
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find the tests/e2e/data/expected_context.yaml. How does this test pass? Could you check the logs?
Is this a run_e2e.py bug or something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already in the codebase here: https://github.com/apache/skywalking-rust/blob/1465d9fc6811dcb009e75f5c1dd2d5b1ef7a52a9/tests/e2e/data/expected_context.yaml

I don't add a new test case but according to your suggestion #9 (comment), enable the existing but not in regression e2e test - it's a plugin test based on agent-test-tool.

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ async-stream = "0.3.2"
[build-dependencies]
tonic-build = "0.5.2"

[dev-dependencies]
tokio-stream = {version = "0.1", features = ["net"]}

[[example]]
name = "simple_trace_report"
path = "examples/simple_trace_report.rs"
35 changes: 22 additions & 13 deletions docker-compose.e2e.yml
Original file line number Diff line number Diff line change
@@ -1,30 +1,39 @@
version: "3.7"
version: "3.9"
services:
collector:
build:
context: .
dockerfile: ./tests/e2e/docker/Dockerfile.tool
image: ghcr.io/apache/skywalking-agent-test-tool/mock-collector:5acb890f225ca37ee60675ce3e330545e23e3cbc
ports:
- 19876:19876
- 12800:12800
- "19876:19876"
- "12800:12800"
healthcheck:
test: [ "CMD", "curl", "http://0.0.0.0:12800/healthCheck" ]
interval: 5s
timeout: 5s

consumer:
build:
context: .
dockerfile: ./tests/e2e/docker/Dockerfile
expose:
expose:
- 8082
command: cargo run -- --mode consumer
command: --mode consumer
depends_on:
- collector
collector:
condition: service_healthy
healthcheck:
test: [ "CMD", "curl", "http://0.0.0.0:8082/healthCheck" ]
interval: 5s
timeout: 5s

producer:
build:
context: .
dockerfile: ./tests/e2e/docker/Dockerfile
ports:
- 8081:8081
command: cargo run -- --mode producer
- "8081:8081"
command: --mode producer
depends_on:
- collector
- consumer
collector:
condition: service_healthy
consumer:
condition: service_healthy
11 changes: 7 additions & 4 deletions examples/simple_trace_report.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::error::Error;
tisonkun marked this conversation as resolved.
Show resolved Hide resolved

use skywalking_rust::context::trace_context::TracingContext;
use skywalking_rust::reporter::grpc::Reporter;
use tokio;

#[tokio::main]
async fn main() {
let tx = Reporter::start("http://0.0.0.0:11800").await;
async fn main() -> Result<(), Box<dyn Error>> {
let reporter = Reporter::start("http://0.0.0.0:11800").await;
let mut context = TracingContext::default("service", "instance");
{
let span = context.create_entry_span("op1").unwrap();
context.finalize_span(span);
}
let _ = tx.send(context).await;
reporter.sender().send(context).await?;
reporter.shutdown().await?;
Ok(())
}
3 changes: 3 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[toolchain]
channel = "1.57.0"
components = ["rustfmt", "clippy"]
2 changes: 1 addition & 1 deletion src/context/propagation/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn encode_propagation(context: &TracingContext, endpoint: &str, address: &st
res += "1-";
res += format!("{}-", encode(context.trace_id.to_string())).as_str();
res += format!("{}-", encode(context.trace_segment_id.to_string())).as_str();
res += format!("{}-", context.next_span_id.to_string()).as_str();
res += format!("{}-", context.next_span_id).as_str();
res += format!("{}-", encode(context.service.as_str())).as_str();
res += format!("{}-", encode(context.service_instance.as_str())).as_str();
res += format!("{}-", encode(endpoint)).as_str();
Expand Down
22 changes: 22 additions & 0 deletions src/context/trace_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::skywalking_proto::v3::{
KeyStringValuePair, Log, RefType, SegmentObject, SegmentReference, SpanLayer, SpanObject,
SpanType,
};
use std::fmt::Formatter;
use std::sync::Arc;

use super::system_time::UnixTimeStampFetcher;
Expand Down Expand Up @@ -61,6 +62,14 @@ pub struct Span {
time_fetcher: Arc<dyn TimeFetcher + Sync + Send>,
}

impl std::fmt::Debug for Span {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Span")
.field("span_internal", &self.span_internal)
.finish()
}
}

static SKYWALKING_RUST_COMPONENT_ID: i32 = 11000;

impl Span {
Expand Down Expand Up @@ -148,6 +157,19 @@ pub struct TracingContext {
segment_link: Option<PropagationContext>,
}

impl std::fmt::Debug for TracingContext {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TracingContext")
.field("trace_id", &self.trace_id)
.field("trace_segment_id", &self.trace_segment_id)
.field("service", &self.service)
.field("service_instance", &self.service_instance)
.field("next_span_id", &self.next_span_id)
.field("spans", &self.spans)
.finish()
}
}

impl TracingContext {
/// Generate a new trace context. Typically called when no context has
/// been propagated and a new trace is to be started.
Expand Down
49 changes: 40 additions & 9 deletions src/reporter/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use crate::context::trace_context::TracingContext;
use crate::skywalking_proto::v3::trace_segment_report_service_client::TraceSegmentReportServiceClient;
use crate::skywalking_proto::v3::SegmentObject;
use std::error::Error;
use tokio::sync::mpsc;
use tonic::transport::Channel;

Expand All @@ -32,12 +33,13 @@ async fn flush(client: &mut ReporterClient, context: SegmentObject) -> Result<()
}
}

pub struct Reporter {}
pub struct Reporter {
tx: mpsc::Sender<TracingContext>,
shutdown_tx: mpsc::Sender<()>,
}

static CHANNEL_BUF_SIZE: usize = 1024;

pub type ContextReporter = mpsc::Sender<TracingContext>;

impl Reporter {
/// Open gRPC client stream to send collected trace context.
/// This function generates a new async task which watch to arrive new trace context.
Expand All @@ -51,23 +53,52 @@ impl Reporter {
/// use skywalking_rust::reporter::grpc::Reporter;
///
/// #[tokio::main]
/// async fn main (){
/// let tx = Reporter::start("localhost:12800").await;
/// async fn main () -> Result<(), Box<dyn Error>> {
/// let reporter = Reporter::start("localhost:12800").await;
/// let mut context = TracingContext::default("service", "instance");
/// tx.send(context).await;
/// reporter.sender().send(context).await?;
/// reporter.shutdown().await?;
/// Ok(())
/// }
/// ```
pub async fn start(address: &str) -> ContextReporter {
pub async fn start(address: impl Into<String>) -> Self {
let (tx, mut rx): (mpsc::Sender<TracingContext>, mpsc::Receiver<TracingContext>) =
mpsc::channel(CHANNEL_BUF_SIZE);
let mut reporter = ReporterClient::connect(address.to_string()).await.unwrap();
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);

let mut reporter = ReporterClient::connect(address.into()).await.unwrap();
tokio::spawn(async move {
loop {
tokio::select! {
message = rx.recv() => {
if let Some(message) = message {
flush(&mut reporter, message.convert_segment_object()).await.unwrap();
} else {
break;
}
},
_ = shutdown_rx.recv() => {
break;
}
}
}
rx.close();
while let Some(message) = rx.recv().await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be required anymore.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. If a select chooses shutdown first, there can be outstanding messages to be processed. We close the rx so that there is no more inflight message, but the outstanding ones should be processed.

See also https://docs.rs/tokio/1.15.0/tokio/sync/mpsc/struct.Receiver.html#method.close for example.

flush(&mut reporter, message.convert_segment_object())
.await
.unwrap();
}
});
tx
Self { tx, shutdown_tx }
}

pub async fn shutdown(self) -> Result<(), Box<dyn Error>> {
self.shutdown_tx.send(()).await?;
self.shutdown_tx.closed().await;
Ok(())
}

pub fn sender(&self) -> mpsc::Sender<TracingContext> {
self.tx.clone()
}
}
4 changes: 3 additions & 1 deletion tests/e2e/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions tests/e2e/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
name = "e2e"
version = "0.1.0"
authors = ["Shikugawa <Shikugawa@gmail.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
edition = "2021"

[dependencies]
skywalking_rust = { path = "../../" }
skywalking_rust = { path = "../.." }
hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
structopt = "0.3"
10 changes: 5 additions & 5 deletions tests/e2e/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM rust:1.50.0
RUN apt update && apt install -y protobuf-compiler
RUN rustup component add rustfmt
COPY . /tmp
WORKDIR tmp/tests/e2e
FROM rust:1.57 as build
WORKDIR /build
COPY . /build/
RUN cd tests/e2e && cargo build --release --locked
ENTRYPOINT ["/build/tests/e2e/target/release/e2e"]
15 changes: 0 additions & 15 deletions tests/e2e/docker/Dockerfile.tool

This file was deleted.

3 changes: 3 additions & 0 deletions tests/e2e/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[toolchain]
channel = "1.57.0"
components = ["rustfmt", "clippy"]
Loading