From 2f36d36d786b330ac00558bcfa942d04a81b52ab Mon Sep 17 00:00:00 2001 From: jmjoy Date: Tue, 23 Aug 2022 19:53:35 +0800 Subject: [PATCH 1/4] Add metrics. --- Cargo.toml | 4 + README.md | 8 +- build.rs | 1 + e2e/data/expected_context.yaml | 30 +++++ e2e/src/main.rs | 46 ++++++- examples/simple_meter_report.rs | 51 ++++++++ src/common/system_time.rs | 2 + src/lib.rs | 1 + src/logging/record.rs | 3 + src/metrics/meter.rs | 87 +++++++++++++ src/metrics/mod.rs | 18 +++ src/metrics/record.rs | 213 ++++++++++++++++++++++++++++++++ src/reporter/grpc.rs | 32 ++++- src/reporter/mod.rs | 3 +- src/reporter/print.rs | 13 +- tests/metrics.rs | 167 +++++++++++++++++++++++++ 16 files changed, 668 insertions(+), 11 deletions(-) create mode 100644 examples/simple_meter_report.rs create mode 100644 src/metrics/meter.rs create mode 100644 src/metrics/mod.rs create mode 100644 src/metrics/record.rs create mode 100644 tests/metrics.rs diff --git a/Cargo.toml b/Cargo.toml index aef34c9..cfeaeb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,10 @@ required-features = ["mock"] name = "logging" required-features = ["mock"] +[[test]] +name = "metrics" +required-features = ["mock"] + [[example]] name = "simple_trace_report" path = "examples/simple_trace_report.rs" diff --git a/README.md b/README.md index 7cf924a..4314696 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,12 @@ the context after the span finished. LogRecord is the simple builder for the LogData, which is the Log format of Skywalking. +## Metrics + +### MeterRecord + +MeterRecord is the simple builder for the MeterData, which is the metrics format of Skywalking. + # Example ```rust, no_run @@ -167,7 +173,7 @@ For details, please refer to [prost-build:sourcing-protoc](https://docs.rs/prost # Release -The SkyWalking committer(PMC included) could follow [this doc](Release-guide.md) to release an official version. +The SkyWalking committer(PMC included) could follow [this doc](https://github.com/apache/skywalking-rust/blob/master/Release-guide.md) to release an official version. # License diff --git a/build.rs b/build.rs index fa64567..7ef9a20 100644 --- a/build.rs +++ b/build.rs @@ -24,6 +24,7 @@ fn main() -> Result<(), Box> { .compile( &[ "./skywalking-data-collect-protocol/language-agent/Tracing.proto", + "./skywalking-data-collect-protocol/language-agent/Meter.proto", "./skywalking-data-collect-protocol/logging/Logging.proto", ], &["./skywalking-data-collect-protocol"], diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml index 9778a2a..88b152a 100644 --- a/e2e/data/expected_context.yaml +++ b/e2e/data/expected_context.yaml @@ -146,3 +146,33 @@ logItems: value: DEBUG timestamp: gt 0 serviceName: consumer + +meterItems: +- serviceName: consumer + meterSize: 3 + meters: + - meterId: + name: instance_trace_count + tags: + - name: region + value: us-west + - name: az + value: az-1 + singleValue: 100.0 + - meterId: + name: instance_trace_count + tags: + - name: region + value: us-east + - name: az + value: az-3 + singleValue: 20.0 + - meterId: + name: instance_trace_count + tags: + - name: region + value: us-north + - name: az + value: az-1 + histogramBuckets: [33.0, 55.0] + histogramValues: [33, 55] diff --git a/e2e/src/main.rs b/e2e/src/main.rs index f3ffd57..e50decd 100644 --- a/e2e/src/main.rs +++ b/e2e/src/main.rs @@ -25,6 +25,10 @@ use skywalking::{ logger::{self, Logger}, record::{LogRecord, RecordType}, }, + metrics::{ + meter::{self, Meter}, + record::MeterRecord, + }, reporter::grpc::GrpcReporter, trace::{ propagation::{ @@ -34,8 +38,9 @@ use skywalking::{ tracer::{self, Tracer}, }, }; -use std::{convert::Infallible, error::Error, net::SocketAddr}; +use std::{convert::Infallible, error::Error, net::SocketAddr, time::Duration}; use structopt::StructOpt; +use tokio::time::sleep; static NOT_FOUND_MSG: &str = "not found"; static SUCCESS_MSG: &str = "Success"; @@ -161,6 +166,8 @@ async fn consumer_response(_req: Request) -> Result, Infall } async fn run_consumer_service(host: [u8; 4]) { + run_consumer_metric().await; + let make_svc = make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(consumer_response)) }); let addr = SocketAddr::from((host, 8082)); @@ -172,6 +179,40 @@ async fn run_consumer_service(host: [u8; 4]) { } } +async fn run_consumer_metric() { + meter::metric( + MeterRecord::new() + .single_value() + .name("instance_trace_count") + .add_label("region", "us-west") + .add_label("az", "az-1") + .value(100.), + ); + + sleep(Duration::from_millis(10)).await; + + meter::metric( + MeterRecord::new() + .single_value() + .name("instance_trace_count") + .add_label("region", "us-east") + .add_label("az", "az-3") + .value(20.), + ); + + sleep(Duration::from_millis(10)).await; + + meter::metric( + MeterRecord::new() + .histogram() + .name("instance_trace_count") + .add_label("region", "us-north") + .add_label("az", "az-1") + .add_value(33., 33, false) + .add_value(55., 55, true), + ); +} + #[derive(StructOpt)] #[structopt(name = "basic")] struct Opt { @@ -187,7 +228,8 @@ async fn main() -> Result<(), Box> { if opt.mode == "consumer" { tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter.clone())); - logger::set_global_logger(Logger::new("consumer", "node_0", reporter)); + logger::set_global_logger(Logger::new("consumer", "node_0", reporter.clone())); + meter::set_global_meter(Meter::new("consumer", "node_0", reporter)); run_consumer_service([0, 0, 0, 0]).await; } else if opt.mode == "producer" { tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter.clone())); diff --git a/examples/simple_meter_report.rs b/examples/simple_meter_report.rs new file mode 100644 index 0000000..6fc4fa4 --- /dev/null +++ b/examples/simple_meter_report.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +use skywalking::{ + metrics::{meter::Meter, record::MeterRecord}, + reporter::grpc::GrpcReporter, +}; +use std::{error::Error, future}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Connect to skywalking oap server. + let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?; + + // Do metrics. + let meter = Meter::new("service", "instance", reporter.clone()); + meter.metric( + MeterRecord::new() + .single_value() + .name("instance_trace_count") + .add_label("region", "us-west") + .add_label("az", "az-1") + .value(100.), + ); + + // Start reporting and quit immediately when have completed the existing + // collection. + reporter + .reporting() + .await + .with_graceful_shutdown(future::ready(())) + .start() + .await?; + + Ok(()) +} diff --git a/src/common/system_time.rs b/src/common/system_time.rs index ee97b4e..6513e81 100644 --- a/src/common/system_time.rs +++ b/src/common/system_time.rs @@ -19,6 +19,7 @@ use cfg_if::cfg_if; pub(crate) enum TimePeriod { Start, Log, + Metric, End, } @@ -28,6 +29,7 @@ cfg_if! { match period { TimePeriod::Start => 1, TimePeriod::Log => 10, + TimePeriod::Metric => 10, TimePeriod::End => 100, } } diff --git a/src/lib.rs b/src/lib.rs index 9cfd58b..7d4cc0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ pub mod common; pub(crate) mod error; pub mod logging; +pub mod metrics; pub mod reporter; pub mod skywalking_proto; pub mod trace; diff --git a/src/logging/record.rs b/src/logging/record.rs index 87f22cf..02f5a83 100644 --- a/src/logging/record.rs +++ b/src/logging/record.rs @@ -55,16 +55,19 @@ impl LogRecord { Default::default() } + #[inline] pub fn custome_time(mut self, time: SystemTime) -> Self { self.time = Some(time); self } + #[inline] pub fn ignore_time(mut self) -> Self { self.is_ignore_time = true; self } + #[inline] pub fn endpoint(mut self, endpoint: impl ToString) -> Self { self.endpoint = endpoint.to_string(); self diff --git a/src/metrics/meter.rs b/src/metrics/meter.rs new file mode 100644 index 0000000..fe5f344 --- /dev/null +++ b/src/metrics/meter.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use crate::{ + reporter::{CollectItem, DynReport, Report}, + skywalking_proto::v3::MeterData, +}; +use std::sync::Arc; +use tokio::sync::OnceCell; + +static GLOBAL_METER: OnceCell = OnceCell::const_new(); + +/// Set the global meter. +pub fn set_global_meter(tracer: Meter) { + if GLOBAL_METER.set(tracer).is_err() { + panic!("global meter has setted") + } +} + +/// Get the global meter. +pub fn global_meter() -> &'static Meter { + GLOBAL_METER.get().expect("global meter haven't setted") +} + +/// Log by global meter. +pub fn metric(record: impl IntoMeterDataWithMeter) { + global_meter().metric(record); +} + +pub trait IntoMeterDataWithMeter { + fn into_meter_data_with_meter(self, meter: &Meter) -> MeterData; +} + +pub struct Inner { + service_name: String, + instance_name: String, + reporter: Box, +} + +#[derive(Clone)] +pub struct Meter { + inner: Arc, +} + +impl Meter { + /// New with service info and reporter. + pub fn new( + service_name: impl ToString, + instance_name: impl ToString, + reporter: impl Report + Send + Sync + 'static, + ) -> Self { + Self { + inner: Arc::new(Inner { + service_name: service_name.to_string(), + instance_name: instance_name.to_string(), + reporter: Box::new(reporter), + }), + } + } + + pub fn service_name(&self) -> &str { + &self.inner.service_name + } + + pub fn instance_name(&self) -> &str { + &self.inner.instance_name + } + + pub fn metric(&self, record: impl IntoMeterDataWithMeter) { + self.inner + .reporter + .report(CollectItem::Meter(record.into_meter_data_with_meter(self))); + } +} diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs new file mode 100644 index 0000000..bc2d071 --- /dev/null +++ b/src/metrics/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +pub mod meter; +pub mod record; diff --git a/src/metrics/record.rs b/src/metrics/record.rs new file mode 100644 index 0000000..2818aba --- /dev/null +++ b/src/metrics/record.rs @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use crate::{ + common::system_time::{fetch_time, TimePeriod}, + metrics::meter::{IntoMeterDataWithMeter, Meter}, + skywalking_proto::v3::{ + meter_data::Metric, Label, MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue, + }, +}; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[derive(Default)] +pub struct MeterRecord { + time: Option, +} + +impl MeterRecord { + #[inline] + pub fn new() -> Self { + Default::default() + } + + #[inline] + pub fn custome_time(mut self, time: SystemTime) -> Self { + self.time = Some(time); + self + } + + #[inline] + pub fn single_value(self) -> SingleValue { + SingleValue { + record: self, + ..Default::default() + } + } + + #[inline] + pub fn histogram(self) -> Histogram { + Histogram { + record: self, + ..Default::default() + } + } +} + +impl IntoMeterDataWithMeter for MeterRecord { + fn into_meter_data_with_meter(self, meter: &Meter) -> MeterData { + MeterData { + service: meter.service_name().to_owned(), + service_instance: meter.instance_name().to_owned(), + timestamp: match self.time { + Some(time) => time + .duration_since(UNIX_EPOCH) + .map(|dur| dur.as_millis() as i64) + .unwrap_or_default(), + None => fetch_time(TimePeriod::Metric), + }, + metric: None, + } + } +} + +#[derive(Default)] +pub struct SingleValue { + record: MeterRecord, + name: String, + labels: Vec<(String, String)>, + value: f64, +} + +impl SingleValue { + #[inline] + pub fn name(mut self, name: impl ToString) -> Self { + self.name = name.to_string(); + self + } + + pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + self.labels.push((key.to_string(), value.to_string())); + self + } + + pub fn add_labels(mut self, tags: I) -> Self + where + K: ToString, + V: ToString, + I: IntoIterator, + { + self.labels.extend( + tags.into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())), + ); + self + } + + #[inline] + pub fn value(mut self, value: f64) -> Self { + self.value = value; + self + } +} + +impl IntoMeterDataWithMeter for SingleValue { + fn into_meter_data_with_meter(self, meter: &Meter) -> MeterData { + MeterData { + service: meter.service_name().to_owned(), + service_instance: meter.instance_name().to_owned(), + timestamp: match self.record.time { + Some(time) => time + .duration_since(UNIX_EPOCH) + .map(|dur| dur.as_millis() as i64) + .unwrap_or_default(), + None => fetch_time(TimePeriod::Metric), + }, + metric: Some(Metric::SingleValue(MeterSingleValue { + name: self.name, + labels: self + .labels + .into_iter() + .map(|(name, value)| Label { name, value }) + .collect(), + value: self.value, + })), + } + } +} + +#[derive(Default)] +pub struct Histogram { + record: MeterRecord, + name: String, + labels: Vec<(String, String)>, + values: Vec<(f64, i64, bool)>, +} + +impl Histogram { + #[inline] + pub fn name(mut self, name: impl ToString) -> Self { + self.name = name.to_string(); + self + } + + pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + self.labels.push((key.to_string(), value.to_string())); + self + } + + pub fn add_labels(mut self, tags: I) -> Self + where + K: ToString, + V: ToString, + I: IntoIterator, + { + self.labels.extend( + tags.into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())), + ); + self + } + + #[inline] + pub fn add_value(mut self, bucket: f64, count: i64, is_negative_infinity: bool) -> Self { + self.values.push((bucket, count, is_negative_infinity)); + self + } +} + +impl IntoMeterDataWithMeter for Histogram { + fn into_meter_data_with_meter(self, meter: &Meter) -> MeterData { + MeterData { + service: meter.service_name().to_owned(), + service_instance: meter.instance_name().to_owned(), + timestamp: match self.record.time { + Some(time) => time + .duration_since(UNIX_EPOCH) + .map(|dur| dur.as_millis() as i64) + .unwrap_or_default(), + None => fetch_time(TimePeriod::Metric), + }, + metric: Some(Metric::Histogram(MeterHistogram { + name: self.name, + labels: self + .labels + .into_iter() + .map(|(name, value)| Label { name, value }) + .collect(), + values: self + .values + .into_iter() + .map(|(bucket, count, is_negative_infinity)| MeterBucketValue { + bucket, + count, + is_negative_infinity, + }) + .collect(), + })), + } + } +} diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index eb92517..fcea63d 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -18,7 +18,8 @@ use crate::{ reporter::{CollectItem, Report}, skywalking_proto::v3::{ log_report_service_client::LogReportServiceClient, - trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData, + meter_report_service_client::MeterReportServiceClient, + trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData, MeterData, SegmentObject, }, }; @@ -102,6 +103,7 @@ impl ColletcItemConsume for mpsc::UnboundedReceiver { struct Inner { trace_client: Mutex>, log_client: Mutex>, + meter_client: Mutex>, producer: P, consumer: Mutex>, is_reporting: AtomicBool, @@ -135,7 +137,8 @@ impl GrpcReporter { Self { inner: Arc::new(Inner { trace_client: Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())), - log_client: Mutex::new(LogReportServiceClient::new(channel)), + log_client: Mutex::new(LogReportServiceClient::new(channel.clone())), + meter_client: Mutex::new(MeterReportServiceClient::new(channel)), producer, consumer: Mutex::new(Some(consumer)), is_reporting: Default::default(), @@ -168,9 +171,10 @@ impl GrpcReporter { Reporting { rb: ReporterAndBuffer { inner: Arc::clone(&self.inner), + status_handle: None, trace_buffer: Default::default(), log_buffer: Default::default(), - status_handle: None, + meter_buffer: Default::default(), }, shutdown_signal: Box::pin(pending()), consumer: self.inner.consumer.lock().await.take().unwrap(), @@ -201,9 +205,10 @@ impl Report for GrpcReporter struct ReporterAndBuffer { inner: Arc>, + status_handle: Option>, trace_buffer: LinkedList, log_buffer: LinkedList, - status_handle: Option>, + meter_buffer: LinkedList, } impl ReporterAndBuffer { @@ -216,6 +221,9 @@ impl ReporterAndBuffer { CollectItem::Log(item) => { self.log_buffer.push_back(item); } + CollectItem::Meter(item) => { + self.meter_buffer.push_back(item); + } } if !self.trace_buffer.is_empty() { @@ -248,6 +256,22 @@ impl ReporterAndBuffer { } } } + + if !self.meter_buffer.is_empty() { + let buffer = take(&mut self.meter_buffer); + if let Err(e) = self + .inner + .meter_client + .lock() + .await + .collect(stream::iter(buffer)) + .await + { + if let Some(status_handle) = &self.status_handle { + status_handle(e); + } + } + } } } diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs index 044afa2..79735bf 100644 --- a/src/reporter/mod.rs +++ b/src/reporter/mod.rs @@ -17,7 +17,7 @@ pub mod grpc; pub mod print; -use crate::skywalking_proto::v3::{LogData, SegmentObject}; +use crate::skywalking_proto::v3::{LogData, MeterData, SegmentObject}; use serde::{Deserialize, Serialize}; use std::{ops::Deref, sync::Arc}; use tokio::sync::OnceCell; @@ -27,6 +27,7 @@ use tokio::sync::OnceCell; pub enum CollectItem { Trace(SegmentObject), Log(LogData), + Meter(MeterData), } pub(crate) type DynReport = dyn Report + Send + Sync + 'static; diff --git a/src/reporter/print.rs b/src/reporter/print.rs index 6b355e5..e5b640f 100644 --- a/src/reporter/print.rs +++ b/src/reporter/print.rs @@ -36,11 +36,11 @@ impl PrintReporter { impl Report for PrintReporter { fn report(&self, items: CollectItem) { match items { - CollectItem::Trace(segment) => { + CollectItem::Trace(data) => { if self.use_stderr { - eprintln!("trace segment={:?}", segment); + eprintln!("trace segment={:?}", data); } else { - println!("trace segment={:?}", segment); + println!("trace segment={:?}", data); } } CollectItem::Log(data) => { @@ -50,6 +50,13 @@ impl Report for PrintReporter { println!("log data={:?}", data); } } + CollectItem::Meter(data) => { + if self.use_stderr { + eprintln!("meter data={:?}", data); + } else { + println!("meter data={:?}", data); + } + } } } } diff --git a/tests/metrics.rs b/tests/metrics.rs new file mode 100644 index 0000000..f84cef3 --- /dev/null +++ b/tests/metrics.rs @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use skywalking::{ + metrics::{ + meter::{Meter}, + record::MeterRecord, + }, + reporter::{CollectItem, Report}, + skywalking_proto::v3::{ + meter_data::Metric, Label, + MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue, + }, +}; +use std::{ + collections::LinkedList, + sync::{Arc, Mutex}, + time::{Duration, SystemTime}, +}; + +#[test] +fn metrics() { + let reporter = Arc::new(MockReporter::default()); + let meter = Meter::new("service_name", "instance_name", reporter.clone()); + + { + meter.metric(MeterRecord::new()); + assert_eq!( + reporter.pop(), + MeterData { + timestamp: 10, + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + metric: None, + } + ); + } + + { + meter.metric( + MeterRecord::new() + .custome_time(SystemTime::UNIX_EPOCH + Duration::from_secs(1_000_000)), + ); + assert_eq!( + reporter.pop(), + MeterData { + timestamp: 1_000_000_000, + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + metric: None, + } + ); + } + + { + meter.metric( + MeterRecord::new() + .single_value() + .name("instance_trace_count") + .add_label("region", "us-west") + .add_labels([("az", "az-1")]) + .value(100.), + ); + assert_eq!( + reporter.pop(), + MeterData { + timestamp: 10, + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + metric: Some(Metric::SingleValue(MeterSingleValue { + name: "instance_trace_count".to_owned(), + labels: vec![ + Label { + name: "region".to_owned(), + value: "us-west".to_owned() + }, + Label { + name: "az".to_owned(), + value: "az-1".to_owned() + }, + ], + value: 100. + })), + } + ); + } + + { + meter.metric( + MeterRecord::new() + .histogram() + .name("instance_trace_count") + .add_label("region", "us-west") + .add_labels([("az", "az-1")]) + .add_value(1., 100, false) + .add_value(2., 200, true), + ); + assert_eq!( + reporter.pop(), + MeterData { + timestamp: 10, + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + metric: Some(Metric::Histogram(MeterHistogram { + name: "instance_trace_count".to_owned(), + labels: vec![ + Label { + name: "region".to_owned(), + value: "us-west".to_owned() + }, + Label { + name: "az".to_owned(), + value: "az-1".to_owned() + }, + ], + values: vec![ + MeterBucketValue { + bucket: 1., + count: 100, + is_negative_infinity: false + }, + MeterBucketValue { + bucket: 2., + count: 200, + is_negative_infinity: true + }, + ] + })), + } + ); + } +} + +#[derive(Default, Clone)] +struct MockReporter { + items: Arc>>, +} + +impl MockReporter { + fn pop(&self) -> MeterData { + self.items.try_lock().unwrap().pop_back().unwrap() + } +} + +impl Report for MockReporter { + fn report(&self, item: CollectItem) { + match item { + CollectItem::Meter(data) => { + self.items.try_lock().unwrap().push_back(data); + } + _ => {} + } + } +} From 937133e6e6a3e6c5de1eaaddec0006286f81c7fc Mon Sep 17 00:00:00 2001 From: jmjoy Date: Tue, 23 Aug 2022 20:00:30 +0800 Subject: [PATCH 2/4] Fmt. --- tests/metrics.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/metrics.rs b/tests/metrics.rs index f84cef3..c4e722b 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -15,14 +15,10 @@ // use skywalking::{ - metrics::{ - meter::{Meter}, - record::MeterRecord, - }, + metrics::{meter::Meter, record::MeterRecord}, reporter::{CollectItem, Report}, skywalking_proto::v3::{ - meter_data::Metric, Label, - MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue, + meter_data::Metric, Label, MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue, }, }; use std::{ From 28a2e47ec97c3a414f6962f5c1cafb7bc410fa03 Mon Sep 17 00:00:00 2001 From: jmjoy Date: Thu, 25 Aug 2022 00:05:26 +0800 Subject: [PATCH 3/4] Refer to java agent. --- Cargo.toml | 1 + e2e/data/expected_context.yaml | 6 +- e2e/src/main.rs | 58 +++--- examples/simple_meter_report.rs | 36 ++-- src/logging/logger.rs | 4 +- src/metrics/meter.rs | 348 ++++++++++++++++++++++++++++---- src/metrics/metricer.rs | 95 +++++++++ src/metrics/mod.rs | 2 +- src/metrics/record.rs | 213 ------------------- tests/metrics.rs | 81 ++++---- 10 files changed, 497 insertions(+), 347 deletions(-) create mode 100644 src/metrics/metricer.rs delete mode 100644 src/metrics/record.rs diff --git a/Cargo.toml b/Cargo.toml index cfeaeb7..b4de204 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ bytes = "1.2.1" cfg-if = "1.0.0" futures-core = "0.3.21" futures-util = "0.3.21" +portable-atomic = { version = "0.3.13", features = ["float"] } prost = "0.11.0" prost-derive = "0.11.0" serde = { version = "1.0.143", features = ["derive"] } diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml index 88b152a..6551e9e 100644 --- a/e2e/data/expected_context.yaml +++ b/e2e/data/expected_context.yaml @@ -158,7 +158,7 @@ meterItems: value: us-west - name: az value: az-1 - singleValue: 100.0 + singleValue: 30.0 - meterId: name: instance_trace_count tags: @@ -174,5 +174,5 @@ meterItems: value: us-north - name: az value: az-1 - histogramBuckets: [33.0, 55.0] - histogramValues: [33, 55] + histogramBuckets: [10.0, 20.0, 30.0] + histogramValues: [1, 2, 0] diff --git a/e2e/src/main.rs b/e2e/src/main.rs index e50decd..ab96743 100644 --- a/e2e/src/main.rs +++ b/e2e/src/main.rs @@ -26,8 +26,8 @@ use skywalking::{ record::{LogRecord, RecordType}, }, metrics::{ - meter::{self, Meter}, - record::MeterRecord, + meter::{Counter, Gauge, Histogram}, + metricer::{self, Metricer}, }, reporter::grpc::GrpcReporter, trace::{ @@ -38,7 +38,10 @@ use skywalking::{ tracer::{self, Tracer}, }, }; -use std::{convert::Infallible, error::Error, net::SocketAddr, time::Duration}; +use std::{ + collections::HashSet, convert::Infallible, error::Error, net::SocketAddr, sync::Arc, + time::Duration, +}; use structopt::StructOpt; use tokio::time::sleep; @@ -166,8 +169,6 @@ async fn consumer_response(_req: Request) -> Result, Infall } async fn run_consumer_service(host: [u8; 4]) { - run_consumer_metric().await; - let make_svc = make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(consumer_response)) }); let addr = SocketAddr::from((host, 8082)); @@ -179,38 +180,31 @@ async fn run_consumer_service(host: [u8; 4]) { } } -async fn run_consumer_metric() { - meter::metric( - MeterRecord::new() - .single_value() - .name("instance_trace_count") +fn run_consumer_metric(mut metricer: Metricer) { + let counter = metricer.register( + Counter::new("instance_trace_count") .add_label("region", "us-west") - .add_label("az", "az-1") - .value(100.), + .add_label("az", "az-1"), ); - - sleep(Duration::from_millis(10)).await; - - meter::metric( - MeterRecord::new() - .single_value() - .name("instance_trace_count") + metricer.register( + Gauge::new("instance_trace_count", || 20.) .add_label("region", "us-east") - .add_label("az", "az-3") - .value(20.), + .add_label("az", "az-3"), ); - - sleep(Duration::from_millis(10)).await; - - meter::metric( - MeterRecord::new() - .histogram() - .name("instance_trace_count") + let histogram = metricer.register( + Histogram::new("instance_trace_count", vec![10., 20., 30.]) .add_label("region", "us-north") - .add_label("az", "az-1") - .add_value(33., 33, false) - .add_value(55., 55, true), + .add_label("az", "az-1"), ); + + counter.increment(10.); + counter.increment(20.); + + histogram.add_value(10.); + histogram.add_value(29.); + histogram.add_value(20.); + + metricer.boot(); } #[derive(StructOpt)] @@ -229,7 +223,7 @@ async fn main() -> Result<(), Box> { if opt.mode == "consumer" { tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter.clone())); logger::set_global_logger(Logger::new("consumer", "node_0", reporter.clone())); - meter::set_global_meter(Meter::new("consumer", "node_0", reporter)); + run_consumer_metric(Metricer::new("consumer", "node_0", reporter)); run_consumer_service([0, 0, 0, 0]).await; } else if opt.mode == "producer" { tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter.clone())); diff --git a/examples/simple_meter_report.rs b/examples/simple_meter_report.rs index 6fc4fa4..4f1c57b 100644 --- a/examples/simple_meter_report.rs +++ b/examples/simple_meter_report.rs @@ -17,35 +17,39 @@ // use skywalking::{ - metrics::{meter::Meter, record::MeterRecord}, + metrics::{meter::Counter, metricer::Metricer}, reporter::grpc::GrpcReporter, }; use std::{error::Error, future}; +use tokio::signal; #[tokio::main] async fn main() -> Result<(), Box> { // Connect to skywalking oap server. let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?; + // Spawn the reporting in background, with listening the graceful shutdown + // signal. + let handle = reporter + .reporting() + .await + .with_graceful_shutdown(async move { + signal::ctrl_c().await.expect("failed to listen for event"); + }) + .spawn(); + // Do metrics. - let meter = Meter::new("service", "instance", reporter.clone()); - meter.metric( - MeterRecord::new() - .single_value() - .name("instance_trace_count") + let mut metricer = Metricer::new("service", "instance", reporter.clone()); + let counter = metricer.register( + Counter::new("instance_trace_count") .add_label("region", "us-west") - .add_label("az", "az-1") - .value(100.), + .add_label("az", "az-1"), ); - // Start reporting and quit immediately when have completed the existing - // collection. - reporter - .reporting() - .await - .with_graceful_shutdown(future::ready(())) - .start() - .await?; + counter.increment(1.); + + metricer.boot().await; + handle.await; Ok(()) } diff --git a/src/logging/logger.rs b/src/logging/logger.rs index 2571ea7..f1bb2dc 100644 --- a/src/logging/logger.rs +++ b/src/logging/logger.rs @@ -22,8 +22,8 @@ use tokio::sync::OnceCell; static GLOBAL_LOGGER: OnceCell = OnceCell::const_new(); /// Set the global logger. -pub fn set_global_logger(tracer: Logger) { - if GLOBAL_LOGGER.set(tracer).is_err() { +pub fn set_global_logger(logger: Logger) { + if GLOBAL_LOGGER.set(logger).is_err() { panic!("global logger has setted") } } diff --git a/src/metrics/meter.rs b/src/metrics/meter.rs index fe5f344..a5b8a50 100644 --- a/src/metrics/meter.rs +++ b/src/metrics/meter.rs @@ -15,73 +15,333 @@ // use crate::{ - reporter::{CollectItem, DynReport, Report}, - skywalking_proto::v3::MeterData, + common::system_time::{fetch_time, TimePeriod}, + metrics::metricer::Metricer, + skywalking_proto::v3::{ + meter_data::Metric, Label, MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue, + }, }; -use std::sync::Arc; -use tokio::sync::OnceCell; +use portable_atomic::AtomicF64; +use std::{ + cmp::Ordering::Equal, + collections::HashSet, + sync::atomic::{self, AtomicI64, AtomicUsize, Ordering}, + time::{SystemTime, UNIX_EPOCH}, +}; +use tokio::sync::Mutex; + +pub trait Transform: Send + Sync { + fn meter_id(&self) -> MeterId; + + fn transform(&self, metricer: &Metricer) -> MeterData; +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) enum MeterType { + COUNTER, + GAUGE, + HISTOGRAM, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct MeterId { + name: String, + typ: MeterType, + labels: Vec<(String, String)>, +} -static GLOBAL_METER: OnceCell = OnceCell::const_new(); +impl MeterId { + fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + self.labels.push((key.to_string(), value.to_string())); + self + } -/// Set the global meter. -pub fn set_global_meter(tracer: Meter) { - if GLOBAL_METER.set(tracer).is_err() { - panic!("global meter has setted") + fn add_labels(mut self, tags: I) -> Self + where + K: ToString, + V: ToString, + I: IntoIterator, + { + self.labels.extend( + tags.into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())), + ); + self } } -/// Get the global meter. -pub fn global_meter() -> &'static Meter { - GLOBAL_METER.get().expect("global meter haven't setted") +/// Counter mode. +pub enum CounterMode { + /// INCREMENT mode represents reporting the latest value. + INCREMENT, + + /// RATE mode represents reporting the increment rate. Value = latest value + /// - last reported value. + RATE, } -/// Log by global meter. -pub fn metric(record: impl IntoMeterDataWithMeter) { - global_meter().metric(record); +pub struct Counter { + id: MeterId, + mode: CounterMode, + count: AtomicF64, + previous_count: AtomicF64, } -pub trait IntoMeterDataWithMeter { - fn into_meter_data_with_meter(self, meter: &Meter) -> MeterData; +impl Counter { + #[inline] + pub fn new(name: impl ToString) -> Self { + Self { + id: MeterId { + name: name.to_string(), + typ: MeterType::COUNTER, + labels: vec![], + }, + mode: CounterMode::INCREMENT, + count: AtomicF64::new(0.), + previous_count: AtomicF64::new(0.), + } + } + + #[inline] + pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + self.id = self.id.add_label(key, value); + self + } + + #[inline] + pub fn add_labels(mut self, tags: I) -> Self + where + K: ToString, + V: ToString, + I: IntoIterator, + { + self.id = self.id.add_labels(tags); + self + } + + #[inline] + pub fn mode(mut self, mode: CounterMode) -> Self { + self.mode = mode; + self + } + + pub fn increment(&self, count: f64) { + self.count.fetch_add(count, Ordering::Acquire); + } + + pub fn get(&self) -> f64 { + self.count.load(Ordering::Acquire) + } } -pub struct Inner { - service_name: String, - instance_name: String, - reporter: Box, +impl Transform for Counter { + fn meter_id(&self) -> MeterId { + self.id.clone() + } + + fn transform(&self, metricer: &Metricer) -> MeterData { + MeterData { + service: metricer.service_name().to_owned(), + service_instance: metricer.instance_name().to_owned(), + timestamp: fetch_time(TimePeriod::Metric), + metric: Some(Metric::SingleValue(MeterSingleValue { + name: self.id.name.to_owned(), + labels: self + .id + .labels + .iter() + .map(|(name, value)| Label { + name: name.clone(), + value: value.clone(), + }) + .collect(), + value: match self.mode { + CounterMode::INCREMENT => self.get(), + CounterMode::RATE => { + let current_count = self.get(); + let previous_count = + self.previous_count.swap(current_count, Ordering::Acquire); + current_count - previous_count + } + }, + })), + } + } } -#[derive(Clone)] -pub struct Meter { - inner: Arc, +pub struct Gauge { + id: MeterId, + getter: G, } -impl Meter { - /// New with service info and reporter. - pub fn new( - service_name: impl ToString, - instance_name: impl ToString, - reporter: impl Report + Send + Sync + 'static, - ) -> Self { +impl f64> Gauge { + #[inline] + pub fn new(name: impl ToString, getter: G) -> Self { Self { - inner: Arc::new(Inner { - service_name: service_name.to_string(), - instance_name: instance_name.to_string(), - reporter: Box::new(reporter), - }), + id: MeterId { + name: name.to_string(), + typ: MeterType::GAUGE, + labels: vec![], + }, + getter, } } - pub fn service_name(&self) -> &str { - &self.inner.service_name + #[inline] + pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + self.id = self.id.add_label(key, value); + self + } + + #[inline] + pub fn add_labels(mut self, tags: I) -> Self + where + K: ToString, + V: ToString, + I: IntoIterator, + { + self.id = self.id.add_labels(tags); + self + } + + pub fn get(&self) -> f64 { + (self.getter)() } +} - pub fn instance_name(&self) -> &str { - &self.inner.instance_name +impl f64 + Send + Sync> Transform for Gauge { + fn meter_id(&self) -> MeterId { + self.id.clone() } - pub fn metric(&self, record: impl IntoMeterDataWithMeter) { - self.inner - .reporter - .report(CollectItem::Meter(record.into_meter_data_with_meter(self))); + fn transform(&self, metricer: &Metricer) -> MeterData { + MeterData { + service: metricer.service_name().to_owned(), + service_instance: metricer.instance_name().to_owned(), + timestamp: fetch_time(TimePeriod::Metric), + metric: Some(Metric::SingleValue(MeterSingleValue { + name: self.id.name.to_owned(), + labels: self + .id + .labels + .iter() + .map(|(name, value)| Label { + name: name.clone(), + value: value.clone(), + }) + .collect(), + value: self.get(), + })), + } + } +} + +struct Bucket { + bucket: f64, + count: AtomicI64, +} + +impl Bucket { + fn new(bucker: f64) -> Self { + Self { + bucket: bucker, + count: Default::default(), + } + } +} + +pub struct Histogram { + id: MeterId, + buckets: Vec, +} + +impl Histogram { + pub fn new(name: impl ToString, mut steps: Vec) -> Self { + Self { + id: MeterId { + name: name.to_string(), + typ: MeterType::HISTOGRAM, + labels: vec![], + }, + buckets: { + steps.sort_by(|a, b| a.partial_cmp(&b).unwrap_or(Equal)); + steps.dedup(); + steps.into_iter().map(Bucket::new).collect() + }, + } + } + + #[inline] + pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + self.id = self.id.add_label(key, value); + self + } + + #[inline] + pub fn add_labels(mut self, tags: I) -> Self + where + K: ToString, + V: ToString, + I: IntoIterator, + { + self.id = self.id.add_labels(tags); + self + } + + pub fn add_value(&self, value: f64) { + if let Some(index) = self.find_bucket(value) { + self.buckets[index].count.fetch_add(1, Ordering::Acquire); + } + } + + fn find_bucket(&self, value: f64) -> Option { + match self + .buckets + .binary_search_by(|bucket| bucket.bucket.partial_cmp(&value).unwrap_or(Equal)) + { + Ok(i) => Some(i), + Err(i) => { + if i >= 1 { + Some(i - 1) + } else { + None + } + } + } + } +} + +impl Transform for Histogram { + fn meter_id(&self) -> MeterId { + self.id.clone() + } + + fn transform(&self, metricer: &Metricer) -> MeterData { + MeterData { + service: metricer.service_name().to_owned(), + service_instance: metricer.instance_name().to_owned(), + timestamp: fetch_time(TimePeriod::Metric), + metric: Some(Metric::Histogram(MeterHistogram { + name: self.id.name.to_owned(), + labels: self + .id + .labels + .iter() + .map(|(name, value)| Label { + name: name.clone(), + value: value.clone(), + }) + .collect(), + values: self + .buckets + .iter() + .map(|bucket| MeterBucketValue { + bucket: bucket.bucket, + count: bucket.count.load(Ordering::Acquire), + is_negative_infinity: false, + }) + .collect(), + })), + } } } diff --git a/src/metrics/metricer.rs b/src/metrics/metricer.rs new file mode 100644 index 0000000..dabacfc --- /dev/null +++ b/src/metrics/metricer.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use super::meter::{MeterId, Transform}; +use crate::{ + reporter::{CollectItem, DynReport, Report}, + skywalking_proto::v3::MeterData, +}; +use std::{collections::HashMap, sync::Arc, time::Duration}; +use tokio::{ + runtime::Handle, + spawn, + sync::{Mutex, OnceCell}, + task::{block_in_place, spawn_blocking, spawn_local, JoinHandle}, + time::interval, +}; + +pub struct Metricer { + service_name: String, + instance_name: String, + reporter: Box, + meter_map: HashMap>, + report_interval: Duration, +} + +impl Metricer { + /// New with service info and reporter. + pub fn new( + service_name: impl ToString, + instance_name: impl ToString, + reporter: impl Report + Send + Sync + 'static, + ) -> Self { + Self { + service_name: service_name.to_string(), + instance_name: instance_name.to_string(), + reporter: Box::new(reporter), + meter_map: Default::default(), + report_interval: Duration::from_secs(20), + } + } + + pub fn service_name(&self) -> &str { + &self.service_name + } + + pub fn instance_name(&self) -> &str { + &self.instance_name + } + + pub fn set_report_interval(&mut self, report_interval: Duration) { + self.report_interval = report_interval; + } + + pub fn register(&mut self, transform: T) -> Arc { + let transform = Arc::new(transform); + self.meter_map + .insert(transform.meter_id(), transform.clone()); + transform + } + + // TODO Shutdownable. + pub fn boot(self) -> JoinHandle<()> { + spawn(async move { + let mut ticker = interval(self.report_interval); + let metricer = Arc::new(self); + loop { + let metricer_ = metricer.clone(); + let _ = spawn_blocking(move || { + for (_, trans) in &metricer_.meter_map { + metricer_ + .reporter + .report(CollectItem::Meter(trans.transform(&metricer_))); + } + }) + .await; + ticker.tick().await; + } + }) + } +} + +pub struct Booting {} diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index bc2d071..3c67e08 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -15,4 +15,4 @@ // pub mod meter; -pub mod record; +pub mod metricer; diff --git a/src/metrics/record.rs b/src/metrics/record.rs deleted file mode 100644 index 2818aba..0000000 --- a/src/metrics/record.rs +++ /dev/null @@ -1,213 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -use crate::{ - common::system_time::{fetch_time, TimePeriod}, - metrics::meter::{IntoMeterDataWithMeter, Meter}, - skywalking_proto::v3::{ - meter_data::Metric, Label, MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue, - }, -}; -use std::time::{SystemTime, UNIX_EPOCH}; - -#[derive(Default)] -pub struct MeterRecord { - time: Option, -} - -impl MeterRecord { - #[inline] - pub fn new() -> Self { - Default::default() - } - - #[inline] - pub fn custome_time(mut self, time: SystemTime) -> Self { - self.time = Some(time); - self - } - - #[inline] - pub fn single_value(self) -> SingleValue { - SingleValue { - record: self, - ..Default::default() - } - } - - #[inline] - pub fn histogram(self) -> Histogram { - Histogram { - record: self, - ..Default::default() - } - } -} - -impl IntoMeterDataWithMeter for MeterRecord { - fn into_meter_data_with_meter(self, meter: &Meter) -> MeterData { - MeterData { - service: meter.service_name().to_owned(), - service_instance: meter.instance_name().to_owned(), - timestamp: match self.time { - Some(time) => time - .duration_since(UNIX_EPOCH) - .map(|dur| dur.as_millis() as i64) - .unwrap_or_default(), - None => fetch_time(TimePeriod::Metric), - }, - metric: None, - } - } -} - -#[derive(Default)] -pub struct SingleValue { - record: MeterRecord, - name: String, - labels: Vec<(String, String)>, - value: f64, -} - -impl SingleValue { - #[inline] - pub fn name(mut self, name: impl ToString) -> Self { - self.name = name.to_string(); - self - } - - pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { - self.labels.push((key.to_string(), value.to_string())); - self - } - - pub fn add_labels(mut self, tags: I) -> Self - where - K: ToString, - V: ToString, - I: IntoIterator, - { - self.labels.extend( - tags.into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())), - ); - self - } - - #[inline] - pub fn value(mut self, value: f64) -> Self { - self.value = value; - self - } -} - -impl IntoMeterDataWithMeter for SingleValue { - fn into_meter_data_with_meter(self, meter: &Meter) -> MeterData { - MeterData { - service: meter.service_name().to_owned(), - service_instance: meter.instance_name().to_owned(), - timestamp: match self.record.time { - Some(time) => time - .duration_since(UNIX_EPOCH) - .map(|dur| dur.as_millis() as i64) - .unwrap_or_default(), - None => fetch_time(TimePeriod::Metric), - }, - metric: Some(Metric::SingleValue(MeterSingleValue { - name: self.name, - labels: self - .labels - .into_iter() - .map(|(name, value)| Label { name, value }) - .collect(), - value: self.value, - })), - } - } -} - -#[derive(Default)] -pub struct Histogram { - record: MeterRecord, - name: String, - labels: Vec<(String, String)>, - values: Vec<(f64, i64, bool)>, -} - -impl Histogram { - #[inline] - pub fn name(mut self, name: impl ToString) -> Self { - self.name = name.to_string(); - self - } - - pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { - self.labels.push((key.to_string(), value.to_string())); - self - } - - pub fn add_labels(mut self, tags: I) -> Self - where - K: ToString, - V: ToString, - I: IntoIterator, - { - self.labels.extend( - tags.into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())), - ); - self - } - - #[inline] - pub fn add_value(mut self, bucket: f64, count: i64, is_negative_infinity: bool) -> Self { - self.values.push((bucket, count, is_negative_infinity)); - self - } -} - -impl IntoMeterDataWithMeter for Histogram { - fn into_meter_data_with_meter(self, meter: &Meter) -> MeterData { - MeterData { - service: meter.service_name().to_owned(), - service_instance: meter.instance_name().to_owned(), - timestamp: match self.record.time { - Some(time) => time - .duration_since(UNIX_EPOCH) - .map(|dur| dur.as_millis() as i64) - .unwrap_or_default(), - None => fetch_time(TimePeriod::Metric), - }, - metric: Some(Metric::Histogram(MeterHistogram { - name: self.name, - labels: self - .labels - .into_iter() - .map(|(name, value)| Label { name, value }) - .collect(), - values: self - .values - .into_iter() - .map(|(bucket, count, is_negative_infinity)| MeterBucketValue { - bucket, - count, - is_negative_infinity, - }) - .collect(), - })), - } - } -} diff --git a/tests/metrics.rs b/tests/metrics.rs index c4e722b..a3e675a 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -15,7 +15,10 @@ // use skywalking::{ - metrics::{meter::Meter, record::MeterRecord}, + metrics::{ + meter::{Counter, Gauge, Histogram}, + metricer::Metricer, + }, reporter::{CollectItem, Report}, skywalking_proto::v3::{ meter_data::Metric, Label, MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue, @@ -30,46 +33,49 @@ use std::{ #[test] fn metrics() { let reporter = Arc::new(MockReporter::default()); - let meter = Meter::new("service_name", "instance_name", reporter.clone()); { - meter.metric(MeterRecord::new()); - assert_eq!( - reporter.pop(), - MeterData { - timestamp: 10, - service: "service_name".to_owned(), - service_instance: "instance_name".to_owned(), - metric: None, - } - ); - } - - { - meter.metric( - MeterRecord::new() - .custome_time(SystemTime::UNIX_EPOCH + Duration::from_secs(1_000_000)), + let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone()); + let counter = metricer.register( + Counter::new("instance_trace_count") + .add_label("region", "us-west") + .add_labels([("az", "az-1")]), ); + counter.increment(100.); + let handle = metricer.boot(); assert_eq!( reporter.pop(), MeterData { - timestamp: 1_000_000_000, + timestamp: 10, service: "service_name".to_owned(), service_instance: "instance_name".to_owned(), - metric: None, + metric: Some(Metric::SingleValue(MeterSingleValue { + name: "instance_trace_count".to_owned(), + labels: vec![ + Label { + name: "region".to_owned(), + value: "us-west".to_owned() + }, + Label { + name: "az".to_owned(), + value: "az-1".to_owned() + }, + ], + value: 100. + })), } ); + handle.abort(); } { - meter.metric( - MeterRecord::new() - .single_value() - .name("instance_trace_count") + let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone()); + metricer.register( + Gauge::new("instance_trace_count", || 100.) .add_label("region", "us-west") - .add_labels([("az", "az-1")]) - .value(100.), + .add_labels([("az", "az-1")]), ); + let handle = metricer.boot(); assert_eq!( reporter.pop(), MeterData { @@ -92,18 +98,20 @@ fn metrics() { })), } ); + handle.abort(); } { - meter.metric( - MeterRecord::new() - .histogram() - .name("instance_trace_count") + let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone()); + let histogram = metricer.register( + Histogram::new("instance_trace_count", vec![1., 2.]) .add_label("region", "us-west") - .add_labels([("az", "az-1")]) - .add_value(1., 100, false) - .add_value(2., 200, true), + .add_labels([("az", "az-1")]), ); + histogram.add_value(1.); + histogram.add_value(1.5); + histogram.add_value(2.); + let handle = metricer.boot(); assert_eq!( reporter.pop(), MeterData { @@ -125,18 +133,19 @@ fn metrics() { values: vec![ MeterBucketValue { bucket: 1., - count: 100, + count: 2, is_negative_infinity: false }, MeterBucketValue { bucket: 2., - count: 200, - is_negative_infinity: true + count: 1, + is_negative_infinity: false }, ] })), } ); + handle.abort(); } } From 5e87a2011cb00df946e51229a5a2c860e6f73345 Mon Sep 17 00:00:00 2001 From: jmjoy Date: Thu, 25 Aug 2022 10:13:17 +0800 Subject: [PATCH 4/4] Update. --- README.md | 24 +++++++- e2e/src/main.rs | 8 +-- ...eter_report.rs => simple_metric_report.rs} | 6 +- src/metrics/meter.rs | 19 +++--- src/metrics/metricer.rs | 60 ++++++++++++++----- tests/metrics.rs | 20 ++++--- 6 files changed, 90 insertions(+), 47 deletions(-) rename examples/{simple_meter_report.rs => simple_metric_report.rs} (95%) diff --git a/README.md b/README.md index 4314696..97211d4 100644 --- a/README.md +++ b/README.md @@ -49,9 +49,11 @@ LogRecord is the simple builder for the LogData, which is the Log format of Skyw ## Metrics -### MeterRecord +### Meter -MeterRecord is the simple builder for the MeterData, which is the metrics format of Skywalking. +- **Counter** API represents a single monotonically increasing counter which automatically collects data and reports to the backend. +- **Gauge** API represents a single numerical value. +- **Histogram** API represents a summary sample observations with customized buckets. # Example @@ -60,6 +62,7 @@ use skywalking::{ logging::{logger::Logger, record::{LogRecord, RecordType}}, reporter::grpc::GrpcReporter, trace::tracer::Tracer, + metrics::{meter::Counter, metricer::Metricer}, }; use std::error::Error; use tokio::signal; @@ -100,6 +103,18 @@ async fn handle_request(tracer: Tracer, logger: Logger) { // Auto report ctx when dropped. } +async fn handle_metric(mut metricer: Metricer) { + let counter = metricer.register( + Counter::new("instance_trace_count") + .add_label("region", "us-west") + .add_label("az", "az-1"), + ); + + metricer.boot().await; + + counter.increment(10.); +} + #[tokio::main] async fn main() -> Result<(), Box> { // Connect to skywalking oap server. @@ -115,7 +130,10 @@ async fn main() -> Result<(), Box> { .spawn(); let tracer = Tracer::new("service", "instance", reporter.clone()); - let logger = Logger::new("service", "instance", reporter); + let logger = Logger::new("service", "instance", reporter.clone()); + let metricer = Metricer::new("service", "instance", reporter); + + handle_metric(metricer).await; handle_request(tracer, logger).await; diff --git a/e2e/src/main.rs b/e2e/src/main.rs index ab96743..dc88364 100644 --- a/e2e/src/main.rs +++ b/e2e/src/main.rs @@ -27,7 +27,7 @@ use skywalking::{ }, metrics::{ meter::{Counter, Gauge, Histogram}, - metricer::{self, Metricer}, + metricer::Metricer, }, reporter::grpc::GrpcReporter, trace::{ @@ -38,12 +38,8 @@ use skywalking::{ tracer::{self, Tracer}, }, }; -use std::{ - collections::HashSet, convert::Infallible, error::Error, net::SocketAddr, sync::Arc, - time::Duration, -}; +use std::{convert::Infallible, error::Error, net::SocketAddr}; use structopt::StructOpt; -use tokio::time::sleep; static NOT_FOUND_MSG: &str = "not found"; static SUCCESS_MSG: &str = "Success"; diff --git a/examples/simple_meter_report.rs b/examples/simple_metric_report.rs similarity index 95% rename from examples/simple_meter_report.rs rename to examples/simple_metric_report.rs index 4f1c57b..ef4f9c8 100644 --- a/examples/simple_meter_report.rs +++ b/examples/simple_metric_report.rs @@ -20,7 +20,7 @@ use skywalking::{ metrics::{meter::Counter, metricer::Metricer}, reporter::grpc::GrpcReporter, }; -use std::{error::Error, future}; +use std::error::Error; use tokio::signal; #[tokio::main] @@ -48,8 +48,8 @@ async fn main() -> Result<(), Box> { counter.increment(1.); - metricer.boot().await; - handle.await; + metricer.boot().await.unwrap(); + handle.await.unwrap(); Ok(()) } diff --git a/src/metrics/meter.rs b/src/metrics/meter.rs index a5b8a50..ea97852 100644 --- a/src/metrics/meter.rs +++ b/src/metrics/meter.rs @@ -24,11 +24,8 @@ use crate::{ use portable_atomic::AtomicF64; use std::{ cmp::Ordering::Equal, - collections::HashSet, - sync::atomic::{self, AtomicI64, AtomicUsize, Ordering}, - time::{SystemTime, UNIX_EPOCH}, + sync::atomic::{AtomicI64, Ordering}, }; -use tokio::sync::Mutex; pub trait Transform: Send + Sync { fn meter_id(&self) -> MeterId; @@ -38,9 +35,9 @@ pub trait Transform: Send + Sync { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub(crate) enum MeterType { - COUNTER, - GAUGE, - HISTOGRAM, + Counter, + Gauge, + Histogram, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -93,7 +90,7 @@ impl Counter { Self { id: MeterId { name: name.to_string(), - typ: MeterType::COUNTER, + typ: MeterType::Counter, labels: vec![], }, mode: CounterMode::INCREMENT, @@ -180,7 +177,7 @@ impl f64> Gauge { Self { id: MeterId { name: name.to_string(), - typ: MeterType::GAUGE, + typ: MeterType::Gauge, labels: vec![], }, getter, @@ -260,11 +257,11 @@ impl Histogram { Self { id: MeterId { name: name.to_string(), - typ: MeterType::HISTOGRAM, + typ: MeterType::Histogram, labels: vec![], }, buckets: { - steps.sort_by(|a, b| a.partial_cmp(&b).unwrap_or(Equal)); + steps.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Equal)); steps.dedup(); steps.into_iter().map(Bucket::new).collect() }, diff --git a/src/metrics/metricer.rs b/src/metrics/metricer.rs index dabacfc..81b8351 100644 --- a/src/metrics/metricer.rs +++ b/src/metrics/metricer.rs @@ -15,16 +15,19 @@ // use super::meter::{MeterId, Transform}; -use crate::{ - reporter::{CollectItem, DynReport, Report}, - skywalking_proto::v3::MeterData, +use crate::reporter::{CollectItem, DynReport, Report}; +use std::{ + collections::HashMap, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, }; -use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::{ - runtime::Handle, - spawn, - sync::{Mutex, OnceCell}, - task::{block_in_place, spawn_blocking, spawn_local, JoinHandle}, + select, spawn, + sync::mpsc, + task::{spawn_blocking, JoinError, JoinHandle}, time::interval, }; @@ -71,25 +74,52 @@ impl Metricer { transform } - // TODO Shutdownable. - pub fn boot(self) -> JoinHandle<()> { - spawn(async move { + pub fn boot(self) -> Booting { + let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + + let handle = spawn(async move { let mut ticker = interval(self.report_interval); let metricer = Arc::new(self); loop { let metricer_ = metricer.clone(); let _ = spawn_blocking(move || { - for (_, trans) in &metricer_.meter_map { + for trans in metricer_.meter_map.values() { metricer_ .reporter .report(CollectItem::Meter(trans.transform(&metricer_))); } }) .await; - ticker.tick().await; + + select! { + _ = ticker.tick() => {} + _ = shutdown_rx.recv() => { return; } + } } - }) + }); + Booting { + handle, + shutdown_tx, + } } } -pub struct Booting {} +pub struct Booting { + handle: JoinHandle<()>, + shutdown_tx: mpsc::Sender<()>, +} + +impl Booting { + pub async fn shutdown(self) -> crate::Result<()> { + self.shutdown_tx.send(()).await.unwrap(); + Ok(self.await?) + } +} + +impl Future for Booting { + type Output = Result<(), JoinError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.handle).poll(cx) + } +} diff --git a/tests/metrics.rs b/tests/metrics.rs index a3e675a..f1d95d0 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -27,11 +27,10 @@ use skywalking::{ use std::{ collections::LinkedList, sync::{Arc, Mutex}, - time::{Duration, SystemTime}, }; -#[test] -fn metrics() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn metrics() { let reporter = Arc::new(MockReporter::default()); { @@ -42,7 +41,9 @@ fn metrics() { .add_labels([("az", "az-1")]), ); counter.increment(100.); - let handle = metricer.boot(); + + metricer.boot().shutdown().await.unwrap(); + assert_eq!( reporter.pop(), MeterData { @@ -65,7 +66,6 @@ fn metrics() { })), } ); - handle.abort(); } { @@ -75,7 +75,9 @@ fn metrics() { .add_label("region", "us-west") .add_labels([("az", "az-1")]), ); - let handle = metricer.boot(); + + metricer.boot().shutdown().await.unwrap(); + assert_eq!( reporter.pop(), MeterData { @@ -98,7 +100,6 @@ fn metrics() { })), } ); - handle.abort(); } { @@ -111,7 +112,9 @@ fn metrics() { histogram.add_value(1.); histogram.add_value(1.5); histogram.add_value(2.); - let handle = metricer.boot(); + + metricer.boot().shutdown().await.unwrap(); + assert_eq!( reporter.pop(), MeterData { @@ -145,7 +148,6 @@ fn metrics() { })), } ); - handle.abort(); } }