Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Filter identical record changes from warp-drive
Browse files Browse the repository at this point in the history
Filter identical record changes from warp-drive on update to not spam
the browser.

Signed-off-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
jgrund committed Dec 5, 2020
1 parent 7c733ea commit ad9bb35
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 47 deletions.
16 changes: 8 additions & 8 deletions iml-services/iml-device/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use iml_service_queue::service_queue::consume_data;
use iml_tracing::tracing;
use iml_wire_types::Fqdn;
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, BTreeSet, HashMap},
iter::FromIterator,
sync::Arc,
};
Expand Down Expand Up @@ -180,16 +180,16 @@ async fn main() -> Result<(), ImlDeviceError> {
vec![],
),
|mut acc, x| {
let host_ids = BTreeSet::from_iter(x.host_ids)
.into_iter()
.map(|x: i32| x.to_string())
.collect::<Vec<_>>()
.join(",");

acc.0.push(x.state);
acc.1.push(x.name);
acc.2.push(x.active_host_id);
acc.3.push(
x.host_ids
.into_iter()
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join(","),
);
acc.3.push(host_ids);
acc.4.push(x.filesystems.join(","));
acc.5.push(x.uuid);
acc.6.push(x.mount_path);
Expand Down
28 changes: 21 additions & 7 deletions iml-warp-drive/src/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn handle_record_change(
RecordChange::Delete(r) => {
tracing::debug!("LISTEN / NOTIFY Delete record: {:?}", r);

let removed = api_cache_state.lock().await.remove_record(r);
let removed = api_cache_state.lock().await.remove_record(r).is_some();

if removed {
users::send_message(
Expand All @@ -49,13 +49,27 @@ async fn handle_record_change(
RecordChange::Update(r) => {
tracing::debug!("LISTEN / NOTIFY Update record: {:?}", r);

api_cache_state.lock().await.insert_record(r);
let record_id = (&r).into();

users::send_message(
Message::RecordChange(record_change),
Arc::clone(&user_state),
)
.await;
let mut cache_state = api_cache_state.lock().await;

let old_record = cache_state.remove_record(record_id);

if old_record.as_ref() != Some(&r) {
tracing::debug!(
"LISTEN / NOTIFY Update change. Old: {:?} New {:?}",
old_record,
r
);

cache_state.insert_record(r);

users::send_message(
Message::RecordChange(record_change),
Arc::clone(&user_state),
)
.await;
}
}
};
}
Expand Down
119 changes: 87 additions & 32 deletions iml-wire-types/src/warp_drive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,38 +151,60 @@ pub struct ArcCache {

impl Cache {
/// Removes the record from the cache
pub fn remove_record(&mut self, x: RecordId) -> bool {
pub fn remove_record(&mut self, x: RecordId) -> Option<Record> {
match x {
RecordId::ActiveAlert(id) => self.active_alert.remove(&id).is_some(),
RecordId::CorosyncConfiguration(id) => {
self.corosync_configuration.remove(&id).is_some()
}
RecordId::Filesystem(id) => self.filesystem.remove(&id).is_some(),
RecordId::Group(id) => self.group.remove(&id).is_some(),
RecordId::Host(id) => self.host.remove(&id).is_some(),
RecordId::LnetConfiguration(id) => self.lnet_configuration.remove(&id).is_some(),
RecordId::ContentType(id) => self.content_type.remove(&id).is_some(),
RecordId::OstPool(id) => self.ost_pool.remove(&id).is_some(),
RecordId::OstPoolOsts(id) => self.ost_pool_osts.remove(&id).is_some(),
RecordId::PacemakerConfiguration(id) => {
self.pacemaker_configuration.remove(&id).is_some()
}
RecordId::SfaDiskDrive(id) => self.sfa_disk_drive.remove(&id).is_some(),
RecordId::SfaEnclosure(id) => self.sfa_enclosure.remove(&id).is_some(),
RecordId::SfaStorageSystem(id) => self.sfa_storage_system.remove(&id).is_some(),
RecordId::SfaJob(id) => self.sfa_job.remove(&id).is_some(),
RecordId::SfaPowerSupply(id) => self.sfa_power_supply.remove(&id).is_some(),
RecordId::SfaController(id) => self.sfa_controller.remove(&id).is_some(),
RecordId::StratagemConfig(id) => self.stratagem_config.remove(&id).is_some(),
RecordId::Snapshot(id) => self.snapshot.remove(&id).is_some(),
RecordId::SnapshotInterval(id) => self.snapshot_interval.remove(&id).is_some(),
RecordId::SnapshotRetention(id) => self.snapshot_retention.remove(&id).is_some(),
RecordId::Target(id) => self.target.remove(&id).is_some(),
RecordId::TargetRecord(id) => self.target_record.remove(&id).is_some(),
RecordId::User(id) => self.user.remove(&id).is_some(),
RecordId::UserGroup(id) => self.user_group.remove(&id).is_some(),
RecordId::Volume(id) => self.volume.remove(&id).is_some(),
RecordId::VolumeNode(id) => self.volume_node.remove(&id).is_some(),
RecordId::ActiveAlert(id) => self.active_alert.remove(&id).map(Record::ActiveAlert),
RecordId::CorosyncConfiguration(id) => self
.corosync_configuration
.remove(&id)
.map(Record::CorosyncConfiguration),
RecordId::Filesystem(id) => self.filesystem.remove(&id).map(Record::Filesystem),
RecordId::Group(id) => self.group.remove(&id).map(Record::Group),
RecordId::Host(id) => self.host.remove(&id).map(Record::Host),
RecordId::LnetConfiguration(id) => self
.lnet_configuration
.remove(&id)
.map(Record::LnetConfiguration),
RecordId::ContentType(id) => self.content_type.remove(&id).map(Record::ContentType),
RecordId::OstPool(id) => self.ost_pool.remove(&id).map(Record::OstPool),
RecordId::OstPoolOsts(id) => self.ost_pool_osts.remove(&id).map(Record::OstPoolOsts),
RecordId::PacemakerConfiguration(id) => self
.pacemaker_configuration
.remove(&id)
.map(Record::PacemakerConfiguration),
RecordId::SfaDiskDrive(id) => self.sfa_disk_drive.remove(&id).map(Record::SfaDiskDrive),
RecordId::SfaEnclosure(id) => self.sfa_enclosure.remove(&id).map(Record::SfaEnclosure),
RecordId::SfaStorageSystem(id) => self
.sfa_storage_system
.remove(&id)
.map(Record::SfaStorageSystem),
RecordId::SfaJob(id) => self.sfa_job.remove(&id).map(Record::SfaJob),
RecordId::SfaPowerSupply(id) => self
.sfa_power_supply
.remove(&id)
.map(Record::SfaPowerSupply),
RecordId::SfaController(id) => {
self.sfa_controller.remove(&id).map(Record::SfaController)
}
RecordId::StratagemConfig(id) => self
.stratagem_config
.remove(&id)
.map(Record::StratagemConfig),
RecordId::Snapshot(id) => self.snapshot.remove(&id).map(Record::Snapshot),
RecordId::SnapshotInterval(id) => self
.snapshot_interval
.remove(&id)
.map(Record::SnapshotInterval),
RecordId::SnapshotRetention(id) => self
.snapshot_retention
.remove(&id)
.map(Record::SnapshotRetention),
RecordId::Target(id) => self.target.remove(&id).map(Record::Target),
RecordId::TargetRecord(id) => self.target_record.remove(&id).map(Record::TargetRecord),
RecordId::User(id) => self.user.remove(&id).map(Record::User),
RecordId::UserGroup(id) => self.user_group.remove(&id).map(Record::UserGroup),
RecordId::Volume(id) => self.volume.remove(&id).map(Record::Volume),
RecordId::VolumeNode(id) => self.volume_node.remove(&id).map(Record::VolumeNode),
}
}
/// Inserts the record into the cache
Expand Down Expand Up @@ -497,7 +519,7 @@ impl From<&ArcCache> for Cache {
}

#[allow(clippy::large_enum_variant)]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
#[serde(tag = "tag", content = "payload")]
pub enum Record {
ActiveAlert(Alert),
Expand Down Expand Up @@ -622,6 +644,39 @@ pub enum RecordId {
VolumeNode(i32),
}

impl From<&Record> for RecordId {
fn from(record: &Record) -> Self {
match record {
Record::ActiveAlert(x) => RecordId::ActiveAlert(x.id),
Record::ContentType(x) => RecordId::ContentType(x.id),
Record::CorosyncConfiguration(x) => RecordId::CorosyncConfiguration(x.id),
Record::Filesystem(x) => RecordId::Filesystem(x.id),
Record::Group(x) => RecordId::Group(x.id),
Record::Host(x) => RecordId::Host(x.id),
Record::LnetConfiguration(x) => RecordId::LnetConfiguration(x.id),
Record::OstPool(x) => RecordId::OstPool(x.id),
Record::OstPoolOsts(x) => RecordId::OstPoolOsts(x.id),
Record::PacemakerConfiguration(x) => RecordId::PacemakerConfiguration(x.id),
Record::SfaDiskDrive(x) => RecordId::SfaDiskDrive(x.id),
Record::SfaEnclosure(x) => RecordId::SfaEnclosure(x.id),
Record::SfaStorageSystem(x) => RecordId::SfaStorageSystem(x.id),
Record::SfaJob(x) => RecordId::SfaJob(x.id),
Record::SfaPowerSupply(x) => RecordId::SfaPowerSupply(x.id),
Record::SfaController(x) => RecordId::SfaController(x.id),
Record::StratagemConfig(x) => RecordId::StratagemConfig(x.id),
Record::Snapshot(x) => RecordId::Snapshot(x.id),
Record::SnapshotInterval(x) => RecordId::SnapshotInterval(x.id),
Record::SnapshotRetention(x) => RecordId::SnapshotRetention(x.id),
Record::Target(x) => RecordId::Target(x.id),
Record::TargetRecord(x) => RecordId::TargetRecord(x.id),
Record::User(x) => RecordId::User(x.id),
Record::UserGroup(x) => RecordId::UserGroup(x.id),
Record::Volume(x) => RecordId::Volume(x.id),
Record::VolumeNode(x) => RecordId::VolumeNode(x.id),
}
}
}

impl Deref for RecordId {
type Target = i32;

Expand Down

0 comments on commit ad9bb35

Please sign in to comment.