Skip to content

Commit

Permalink
Fix metrics_per_day view and improve test by building the real Metric…
Browse files Browse the repository at this point in the history
…EventMessage json
  • Loading branch information
sfauvel committed Oct 31, 2024
1 parent 10703b4 commit b77559c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 31 deletions.
2 changes: 1 addition & 1 deletion mithril-aggregator/src/event_store/database/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ create table if not exists event (
r#"
create view if not exists metrics_per_day as select metric_date as date, action as counter_name, sum(counter) value from
(
select action, json_extract(content, '$.content.counter') counter, date(json_extract(content, '$.content.date')) metric_date
select action, json_extract(content, '$.content.value') counter, date(json_extract(content, '$.content.date')) metric_date
from event
where source='Metrics'
)
Expand Down
33 changes: 16 additions & 17 deletions mithril-aggregator/src/event_store/database/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,18 @@ mod tests {
mod metrics_per_day_view {
use std::time::Duration;

use crate::event_store::database::test_helper::event_store_db_connection;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::{
event_store::{database::test_helper::event_store_db_connection, TransmitterService},
services::UsageReporter,
test_tools::TestLogger,
};
use chrono::DateTime;

use mithril_common::StdResult;

use sqlite::ConnectionThreadSafe;

use super::*;
#[derive(Serialize, Deserialize)]
struct MetricMessage {
counter: i64,
duration: Duration,
date: DateTime<Utc>,
}

fn get_all_metrics(
connection: Arc<ConnectionThreadSafe>,
Expand Down Expand Up @@ -106,16 +103,18 @@ mod tests {
let metric_date =
DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();

let message = EventMessage::new(
"Metrics",
metric_name,
serde_json::json!(MetricMessage {
counter: value,
duration: Duration::from_secs(3),
date: metric_date.into(),
}),
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
let transmitter_service = Arc::new(TransmitterService::new(tx, TestLogger::stdout()));
let _result = UsageReporter::send_metric_event(
&transmitter_service,
metric_name.to_string(),
value,
Duration::from_secs(5),
metric_date.into(),
);

let message: EventMessage = rx.try_recv().unwrap();

let _event = persister.persist(message).unwrap();
}

Expand Down
35 changes: 22 additions & 13 deletions mithril-aggregator/src/services/usage_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ impl UsageReporter {
.collect()
}

pub(crate) fn send_metric_event(
transmitter: &TransmitterService<EventMessage>,
name: String,
value: i64,
period: Duration,
date: DateTime<Utc>,
) -> Result<(), String> {
transmitter.send_event_message::<MetricEventMessage>(
"Metrics",
&name.clone(),
&MetricEventMessage {
name,
value,
period,
date,
},
vec![],
)
}

fn send_metrics(&mut self, duration: &Duration) {
let metrics = self.metrics_service.export_metrics_map();
let delta = Self::compute_metrics_delta(&self.last_reported_metrics, &metrics);
Expand All @@ -67,19 +87,8 @@ impl UsageReporter {
self.last_reported_metrics = metrics;

for (name, value) in delta {
let _result = self
.transmitter_service
.send_event_message::<MetricEventMessage>(
"Metrics",
&name.clone(),
&MetricEventMessage {
name,
value,
period: *duration,
date,
},
vec![],
);
let _result =
Self::send_metric_event(&self.transmitter_service, name, value, *duration, date);
}
}

Expand Down

0 comments on commit b77559c

Please sign in to comment.