diff --git a/.github/workflows/codecov.yaml b/.github/workflows/codecov.yaml index 052ecaf..791d418 100644 --- a/.github/workflows/codecov.yaml +++ b/.github/workflows/codecov.yaml @@ -38,4 +38,6 @@ jobs: toolchain: stable override: true - uses: actions-rs/tarpaulin@v0.1 + with: + version: 0.22.0 - uses: codecov/codecov-action@v2.1.0 diff --git a/examples/simple_management_report.rs b/examples/simple_management_report.rs index 107aa95..71a186a 100644 --- a/examples/simple_management_report.rs +++ b/examples/simple_management_report.rs @@ -40,13 +40,16 @@ async fn main() -> Result<(), Box> { let manager = Manager::new("service", "instance", reporter); - // Report instance properties. - let mut props = Properties::default(); - props.insert_os_info(); - manager.report_properties(props); - - // Keep alive - manager.keep_alive(Duration::from_secs(10)); + // Report instance properties and keep alive. + manager.report_and_keep_alive( + || { + let mut props = Properties::default(); + props.insert_os_info(); + props + }, + Duration::from_secs(30), + 10, + ); handle.await?; diff --git a/src/management/manager.rs b/src/management/manager.rs index 9641c35..67f73c3 100644 --- a/src/management/manager.rs +++ b/src/management/manager.rs @@ -64,40 +64,107 @@ impl Manager { /// Report instance properties. pub fn report_properties(&self, properties: Properties) { - let props = properties - .convert_to_instance_properties(self.service_name.clone(), self.instance_name.clone()); - self.reporter.report(CollectItem::Instance(Box::new(props))); + Self::reporter_report_properties( + &self.reporter, + self.service_name.clone(), + self.instance_name.clone(), + properties, + ); } - /// Do keep alive (heartbeat), with the interval, will be run in background. - pub fn keep_alive(&self, interval: Duration) -> KeepAlive { + fn reporter_report_properties( + reporter: &Arc, + service_name: String, + instance_name: String, + properties: Properties, + ) { + let props = properties.convert_to_instance_properties(service_name, instance_name); + reporter.report(CollectItem::Instance(Box::new(props))); + } + + /// Do keep alive once. + pub fn keep_alive(&self) { + Self::reporter_keep_alive( + &self.reporter, + self.service_name.clone(), + self.instance_name.clone(), + ); + } + + fn reporter_keep_alive(reporter: &Arc, service_name: String, instance_name: String) { + reporter.report(CollectItem::Ping(Box::new( + crate::skywalking_proto::v3::InstancePingPkg { + service: service_name, + service_instance: instance_name, + layer: Default::default(), + }, + ))); + } + + /// Continuously report instance properties and keep alive. Run in + /// background. + /// + /// Parameter `heartbeat_period` represents agent heartbeat report period. + /// + /// Parameter `properties_report_period_factor` represents agent sends the + /// instance properties to the backend every `heartbeat_period` * + /// `properties_report_period_factor` seconds. + pub fn report_and_keep_alive( + &self, + properties: impl Fn() -> Properties + Send + 'static, + heartbeat_period: Duration, + properties_report_period_factor: usize, + ) -> ReportAndKeepAlive { let service_name = self.service_name.clone(); let instance_name = self.instance_name.clone(); let reporter = self.reporter.clone(); + let handle = spawn(async move { - let mut ticker = time::interval(interval); + let mut counter = 0; + + let mut ticker = time::interval(heartbeat_period); loop { ticker.tick().await; - reporter.report(CollectItem::Ping(Box::new( - crate::skywalking_proto::v3::InstancePingPkg { - service: service_name.clone(), - service_instance: instance_name.clone(), - layer: Default::default(), - }, - ))); + if counter == 0 { + Self::reporter_report_properties( + &reporter, + service_name.clone(), + instance_name.clone(), + properties(), + ); + } else { + Self::reporter_keep_alive( + &reporter, + service_name.clone(), + instance_name.clone(), + ); + } + + counter += 1; + + if counter >= properties_report_period_factor { + counter = 0; + } } }); - KeepAlive { handle } + ReportAndKeepAlive { handle } } } -/// Handle of [Manager::keep_alive]. -pub struct KeepAlive { +/// Handle of [Manager::report_and_keep_alive]. +pub struct ReportAndKeepAlive { handle: JoinHandle<()>, } -impl Future for KeepAlive { +impl ReportAndKeepAlive { + /// Get the inner tokio join handle. + pub fn handle(&self) -> &JoinHandle<()> { + &self.handle + } +} + +impl Future for ReportAndKeepAlive { type Output = Result<(), JoinError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/tests/management.rs b/tests/management.rs index 5b10419..681da8e 100644 --- a/tests/management.rs +++ b/tests/management.rs @@ -31,13 +31,23 @@ use tokio::time::sleep; async fn management() { let reporter = Arc::new(MockReporter::default()); let manager = Manager::new("service_name", "instance_name", reporter.clone()); - manager.keep_alive(Duration::from_secs(60)); + let handling = manager.report_and_keep_alive( + || { + let mut props = Properties::new(); + props.insert_os_info(); + props + }, + Duration::from_millis(100), + 3, + ); - { - let mut props = Properties::new(); - props.insert_os_info(); - manager.report_properties(props); + sleep(Duration::from_secs(1)).await; + + handling.handle().abort(); + sleep(Duration::from_secs(1)).await; + + { let actual_props = reporter.pop_ins_props(); assert_eq!(actual_props.service, "service_name".to_owned()); assert_eq!(actual_props.service_instance, "instance_name".to_owned()); @@ -56,7 +66,6 @@ async fn management() { } { - sleep(Duration::from_secs(1)).await; assert_eq!( reporter.pop_ping(), InstancePingPkg { @@ -66,25 +75,72 @@ async fn management() { } ); } + + { + reporter.pop_ping(); + } + + { + reporter.pop_ins_props(); + } + + { + reporter.pop_ping(); + } + + { + reporter.pop_ping(); + } } fn kvs_get_value<'a>(kvs: &'a [KeyStringValuePair], key: &str) -> &'a str { &kvs.iter().find(|kv| kv.key == key).unwrap().value } +#[derive(Debug)] +enum Item { + Properties(InstanceProperties), + PingPkg(InstancePingPkg), +} + +impl Item { + fn unwrap_properties(self) -> InstanceProperties { + match self { + Item::Properties(props) => props, + Item::PingPkg(_) => panic!("isn't properties"), + } + } + + fn unwrap_ping_pkg(self) -> InstancePingPkg { + match self { + Item::Properties(_) => panic!("isn't ping pkg"), + Item::PingPkg(p) => p, + } + } +} + #[derive(Default, Clone)] struct MockReporter { - props_items: Arc>>, - ping_items: Arc>>, + items: Arc>>, } impl MockReporter { fn pop_ins_props(&self) -> InstanceProperties { - self.props_items.try_lock().unwrap().pop_back().unwrap() + self.items + .try_lock() + .unwrap() + .pop_front() + .unwrap() + .unwrap_properties() } fn pop_ping(&self) -> InstancePingPkg { - self.ping_items.try_lock().unwrap().pop_back().unwrap() + self.items + .try_lock() + .unwrap() + .pop_front() + .unwrap() + .unwrap_ping_pkg() } } @@ -92,12 +148,20 @@ impl Report for MockReporter { fn report(&self, item: CollectItem) { match item { CollectItem::Instance(data) => { - self.props_items.try_lock().unwrap().push_back(*data); + self.items + .try_lock() + .unwrap() + .push_back(Item::Properties(*data)); } CollectItem::Ping(data) => { - self.ping_items.try_lock().unwrap().push_back(*data); + self.items + .try_lock() + .unwrap() + .push_back(Item::PingPkg(*data)); + } + _ => { + unreachable!("unknown collect item type"); } - _ => {} } } }