Skip to content

Commit

Permalink
Add tracer. (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmjoy authored Jul 10, 2022
1 parent 2034a2f commit 699247d
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 150 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ on:
tags:
- 'v*'

env:
CARGO_TERM_COLOR: always
RUST_BACKTRACE: "1"
RUSTFLAGS: "-D warnings"

jobs:
CI:
runs-on: ubuntu-latest
Expand All @@ -48,3 +53,5 @@ jobs:
run: cargo clippy --workspace --tests -- -D warnings
- name: Run tests
run: cargo test --workspace
- name: Run docs
run: cargo rustdoc --all-features -- -D warnings
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ repository = "https://github.com/apache/skywalking-rust"
async-stream = "0.3.3"
base64 = "0.13.0"
bytes = "1.1.0"
futures-core = "0.3.21"
futures-util = "0.3.21"
prost = "0.10.4"
prost-derive = "0.10.1"
thiserror = "1.0.31"
tokio = { version = "1.18.2", features = ["full"] }
tonic = "0.7.2"
tonic = { version = "0.7.2", features = ["codegen"] }
tracing = "0.1.35"
uuid = { version = "1.1.0", features = ["serde", "v4"] }

[build-dependencies]
Expand Down
42 changes: 28 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,46 +33,60 @@ context after the span finished.

# Example

```rust
use skywalking::context::trace_context::TracingContext;
use skywalking::reporter::grpc::Reporter;
use tokio;
```rust, no_run
use skywalking::context::tracer::Tracer;
use skywalking::reporter::grpc::GrpcReporter;
use std::error::Error;
use std::sync::Arc;
use tokio::signal;
async fn handle_request(tracer: Arc<Tracer<GrpcReporter>>) {
let mut ctx = tracer.create_trace_context();
async fn handle_request(reporter: ContextReporter) {
let mut ctx = TracingContext::default("svc", "ins");
{
// Generate an Entry Span when a request
// is received. An Entry Span is generated only once per context.
let span = ctx.create_entry_span("operation1").unwrap();
let span = ctx.create_entry_span("op1").unwrap();
// Something...
{
// Generates an Exit Span when executing an RPC.
let span2 = ctx.create_exit_span("operation2").unwrap();
let span2 = ctx.create_exit_span("op2", "remote_peer").unwrap();
// Something...
ctx.finalize_span(span2);
}
ctx.finalize_span(span);
}
reporter.send(context).await;
tracer.finalize_context(ctx);
}
#[tokio::main]
async fn main() {
let tx = Reporter::start("http://0.0.0.0:11800").await;
async fn main() -> Result<(), Box<dyn Error>> {
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
let tracer = Arc::new(Tracer::new("service", "instance", reporter));
tokio::spawn(handle_request(tracer.clone()));
// Start server...
// Start to report.
let handle = tracer.reporting(async move {
let _ = signal::ctrl_c().await;
});
handle.await?;
Ok(())
}
```

# How to compile?
If you have `skywalking-(VERSION).crate`, you can unpack it with the way as follows:

```
```shell
tar -xvzf skywalking-(VERSION).crate
```

Expand Down
26 changes: 21 additions & 5 deletions e2e/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

78 changes: 44 additions & 34 deletions e2e/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,35 @@ use hyper::{Body, Client, Method, Request, Response, Server, StatusCode};
use skywalking::context::propagation::context::SKYWALKING_HTTP_CONTEXT_HEADER_KEY;
use skywalking::context::propagation::decoder::decode_propagation;
use skywalking::context::propagation::encoder::encode_propagation;
use skywalking::context::trace_context::TracingContext;
use skywalking::reporter::grpc::Reporter;
use skywalking::context::tracer::Tracer;
use skywalking::reporter::grpc::GrpcReporter;
use std::convert::Infallible;
use std::error::Error;
use std::future::pending;
use std::net::SocketAddr;
use structopt::StructOpt;
use tokio::sync::mpsc;
use tokio::sync::OnceCell;

static NOT_FOUND_MSG: &str = "not found";
static SUCCESS_MSG: &str = "Success";

static GLOBAL_TRACER: OnceCell<Tracer<GrpcReporter>> = OnceCell::const_new();

fn set_global_tracer(tracer: Tracer<GrpcReporter>) {
if GLOBAL_TRACER.set(tracer).is_err() {
panic!("TRACER has setted")
}
}

fn get_global_tracer() -> &'static Tracer<GrpcReporter> {
GLOBAL_TRACER.get().expect("TRACER haven't setted")
}

async fn handle_ping(
_req: Request<Body>,
client: Client<HttpConnector>,
tx: mpsc::Sender<TracingContext>,
) -> Result<Response<Body>, Infallible> {
let mut context = TracingContext::default("producer", "node_0");
let mut context = get_global_tracer().create_trace_context();
let span = context.create_entry_span("/ping").unwrap();
{
let span2 = context.create_exit_span("/pong", "consumer:8082").unwrap();
Expand All @@ -53,17 +65,16 @@ async fn handle_ping(
context.finalize_span(span2);
}
context.finalize_span(span);
let _ = tx.send(context).await;
get_global_tracer().finalize_context(context);
Ok(Response::new(Body::from("hoge")))
}

async fn producer_response(
_req: Request<Body>,
client: Client<HttpConnector>,
tx: mpsc::Sender<TracingContext>,
) -> Result<Response<Body>, Infallible> {
match (_req.method(), _req.uri().path()) {
(&Method::GET, "/ping") => handle_ping(_req, client, tx).await,
(&Method::GET, "/ping") => handle_ping(_req, client).await,
(&Method::GET, "/healthCheck") => Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(SUCCESS_MSG))
Expand All @@ -75,15 +86,14 @@ async fn producer_response(
}
}

async fn run_producer_service(host: [u8; 4], tx: mpsc::Sender<TracingContext>) {
async fn run_producer_service(host: [u8; 4]) {
let client = Client::new();
let make_svc = make_service_fn(|_| {
let tx = tx.clone();
let client = client.clone();

async {
Ok::<_, Infallible>(service_fn(move |req| {
producer_response(req, client.to_owned(), tx.to_owned())
producer_response(req, client.to_owned())
}))
}
});
Expand All @@ -95,29 +105,23 @@ async fn run_producer_service(host: [u8; 4], tx: mpsc::Sender<TracingContext>) {
}
}

async fn handle_pong(
_req: Request<Body>,
tx: mpsc::Sender<TracingContext>,
) -> Result<Response<Body>, Infallible> {
async fn handle_pong(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
let ctx = decode_propagation(
_req.headers()[SKYWALKING_HTTP_CONTEXT_HEADER_KEY]
.to_str()
.unwrap(),
)
.unwrap();
let mut context = TracingContext::from_propagation_context("consumer", "node_0", ctx);
let mut context = get_global_tracer().create_trace_context_from_propagation(ctx);
let span = context.create_entry_span("/pong").unwrap();
context.finalize_span(span);
let _ = tx.send(context).await;
get_global_tracer().finalize_context(context);
Ok(Response::new(Body::from("hoge")))
}

async fn consumer_response(
_req: Request<Body>,
tx: mpsc::Sender<TracingContext>,
) -> Result<Response<Body>, Infallible> {
async fn consumer_response(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
match (_req.method(), _req.uri().path()) {
(&Method::GET, "/pong") => handle_pong(_req, tx).await,
(&Method::GET, "/pong") => handle_pong(_req).await,
(&Method::GET, "/healthCheck") => Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(SUCCESS_MSG))
Expand All @@ -129,11 +133,9 @@ async fn consumer_response(
}
}

async fn run_consumer_service(host: [u8; 4], tx: mpsc::Sender<TracingContext>) {
let make_svc = make_service_fn(|_| {
let tx = tx.clone();
async { Ok::<_, Infallible>(service_fn(move |req| consumer_response(req, tx.to_owned()))) }
});
async fn run_consumer_service(host: [u8; 4]) {
let make_svc =
make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(consumer_response)) });
let addr = SocketAddr::from((host, 8082));
let server = Server::bind(&addr).serve(make_svc);

Expand All @@ -153,15 +155,23 @@ struct Opt {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let opt = Opt::from_args();
let reporter = Reporter::start("http://collector:19876").await?;
let tx = reporter.sender();
let reporter = GrpcReporter::connect("http://collector:19876").await?;

if opt.mode == "consumer" {
run_consumer_service([0, 0, 0, 0], tx).await;
let handle = if opt.mode == "consumer" {
set_global_tracer(Tracer::new("consumer", "node_0", reporter));
let handle = get_global_tracer().reporting(pending());
run_consumer_service([0, 0, 0, 0]).await;
handle
} else if opt.mode == "producer" {
run_producer_service([0, 0, 0, 0], tx).await;
}
set_global_tracer(Tracer::new("producer", "node_0", reporter));
let handle = get_global_tracer().reporting(pending());
run_producer_service([0, 0, 0, 0]).await;
handle
} else {
unreachable!()
};

handle.await?;

reporter.shutdown().await?;
Ok(())
}
Loading

0 comments on commit 699247d

Please sign in to comment.