Skip to content

Commit

Permalink
Refactor management report and keep alive api. (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmjoy authored Feb 1, 2023
1 parent f544829 commit 7d4b379
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 37 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/codecov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ jobs:
toolchain: stable
override: true
- uses: actions-rs/tarpaulin@v0.1
with:
version: 0.22.0
- uses: codecov/codecov-action@v2.1.0
17 changes: 10 additions & 7 deletions examples/simple_management_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ async fn main() -> Result<(), Box<dyn Error>> {

let manager = Manager::new("service", "instance", reporter);

// Report instance properties.
let mut props = Properties::default();
props.insert_os_info();
manager.report_properties(props);

// Keep alive
manager.keep_alive(Duration::from_secs(10));
// Report instance properties and keep alive.
manager.report_and_keep_alive(
|| {
let mut props = Properties::default();
props.insert_os_info();
props
},
Duration::from_secs(30),
10,
);

handle.await?;

Expand Down
101 changes: 84 additions & 17 deletions src/management/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,40 +64,107 @@ impl Manager {

/// Report instance properties.
pub fn report_properties(&self, properties: Properties) {
let props = properties
.convert_to_instance_properties(self.service_name.clone(), self.instance_name.clone());
self.reporter.report(CollectItem::Instance(Box::new(props)));
Self::reporter_report_properties(
&self.reporter,
self.service_name.clone(),
self.instance_name.clone(),
properties,
);
}

/// Do keep alive (heartbeat), with the interval, will be run in background.
pub fn keep_alive(&self, interval: Duration) -> KeepAlive {
fn reporter_report_properties(
reporter: &Arc<DynReport>,
service_name: String,
instance_name: String,
properties: Properties,
) {
let props = properties.convert_to_instance_properties(service_name, instance_name);
reporter.report(CollectItem::Instance(Box::new(props)));
}

/// Do keep alive once.
pub fn keep_alive(&self) {
Self::reporter_keep_alive(
&self.reporter,
self.service_name.clone(),
self.instance_name.clone(),
);
}

fn reporter_keep_alive(reporter: &Arc<DynReport>, service_name: String, instance_name: String) {
reporter.report(CollectItem::Ping(Box::new(
crate::skywalking_proto::v3::InstancePingPkg {
service: service_name,
service_instance: instance_name,
layer: Default::default(),
},
)));
}

/// Continuously report instance properties and keep alive. Run in
/// background.
///
/// Parameter `heartbeat_period` represents agent heartbeat report period.
///
/// Parameter `properties_report_period_factor` represents agent sends the
/// instance properties to the backend every `heartbeat_period` *
/// `properties_report_period_factor` seconds.
pub fn report_and_keep_alive(
&self,
properties: impl Fn() -> Properties + Send + 'static,
heartbeat_period: Duration,
properties_report_period_factor: usize,
) -> ReportAndKeepAlive {
let service_name = self.service_name.clone();
let instance_name = self.instance_name.clone();
let reporter = self.reporter.clone();

let handle = spawn(async move {
let mut ticker = time::interval(interval);
let mut counter = 0;

let mut ticker = time::interval(heartbeat_period);
loop {
ticker.tick().await;

reporter.report(CollectItem::Ping(Box::new(
crate::skywalking_proto::v3::InstancePingPkg {
service: service_name.clone(),
service_instance: instance_name.clone(),
layer: Default::default(),
},
)));
if counter == 0 {
Self::reporter_report_properties(
&reporter,
service_name.clone(),
instance_name.clone(),
properties(),
);
} else {
Self::reporter_keep_alive(
&reporter,
service_name.clone(),
instance_name.clone(),
);
}

counter += 1;

if counter >= properties_report_period_factor {
counter = 0;
}
}
});
KeepAlive { handle }
ReportAndKeepAlive { handle }
}
}

/// Handle of [Manager::keep_alive].
pub struct KeepAlive {
/// Handle of [Manager::report_and_keep_alive].
pub struct ReportAndKeepAlive {
handle: JoinHandle<()>,
}

impl Future for KeepAlive {
impl ReportAndKeepAlive {
/// Get the inner tokio join handle.
pub fn handle(&self) -> &JoinHandle<()> {
&self.handle
}
}

impl Future for ReportAndKeepAlive {
type Output = Result<(), JoinError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down
90 changes: 77 additions & 13 deletions tests/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,23 @@ use tokio::time::sleep;
async fn management() {
let reporter = Arc::new(MockReporter::default());
let manager = Manager::new("service_name", "instance_name", reporter.clone());
manager.keep_alive(Duration::from_secs(60));
let handling = manager.report_and_keep_alive(
|| {
let mut props = Properties::new();
props.insert_os_info();
props
},
Duration::from_millis(100),
3,
);

{
let mut props = Properties::new();
props.insert_os_info();
manager.report_properties(props);
sleep(Duration::from_secs(1)).await;

handling.handle().abort();

sleep(Duration::from_secs(1)).await;

{
let actual_props = reporter.pop_ins_props();
assert_eq!(actual_props.service, "service_name".to_owned());
assert_eq!(actual_props.service_instance, "instance_name".to_owned());
Expand All @@ -56,7 +66,6 @@ async fn management() {
}

{
sleep(Duration::from_secs(1)).await;
assert_eq!(
reporter.pop_ping(),
InstancePingPkg {
Expand All @@ -66,38 +75,93 @@ async fn management() {
}
);
}

{
reporter.pop_ping();
}

{
reporter.pop_ins_props();
}

{
reporter.pop_ping();
}

{
reporter.pop_ping();
}
}

fn kvs_get_value<'a>(kvs: &'a [KeyStringValuePair], key: &str) -> &'a str {
&kvs.iter().find(|kv| kv.key == key).unwrap().value
}

#[derive(Debug)]
enum Item {
Properties(InstanceProperties),
PingPkg(InstancePingPkg),
}

impl Item {
fn unwrap_properties(self) -> InstanceProperties {
match self {
Item::Properties(props) => props,
Item::PingPkg(_) => panic!("isn't properties"),
}
}

fn unwrap_ping_pkg(self) -> InstancePingPkg {
match self {
Item::Properties(_) => panic!("isn't ping pkg"),
Item::PingPkg(p) => p,
}
}
}

#[derive(Default, Clone)]
struct MockReporter {
props_items: Arc<Mutex<LinkedList<InstanceProperties>>>,
ping_items: Arc<Mutex<LinkedList<InstancePingPkg>>>,
items: Arc<Mutex<LinkedList<Item>>>,
}

impl MockReporter {
fn pop_ins_props(&self) -> InstanceProperties {
self.props_items.try_lock().unwrap().pop_back().unwrap()
self.items
.try_lock()
.unwrap()
.pop_front()
.unwrap()
.unwrap_properties()
}

fn pop_ping(&self) -> InstancePingPkg {
self.ping_items.try_lock().unwrap().pop_back().unwrap()
self.items
.try_lock()
.unwrap()
.pop_front()
.unwrap()
.unwrap_ping_pkg()
}
}

impl Report for MockReporter {
fn report(&self, item: CollectItem) {
match item {
CollectItem::Instance(data) => {
self.props_items.try_lock().unwrap().push_back(*data);
self.items
.try_lock()
.unwrap()
.push_back(Item::Properties(*data));
}
CollectItem::Ping(data) => {
self.ping_items.try_lock().unwrap().push_back(*data);
self.items
.try_lock()
.unwrap()
.push_back(Item::PingPkg(*data));
}
_ => {
unreachable!("unknown collect item type");
}
_ => {}
}
}
}

0 comments on commit 7d4b379

Please sign in to comment.