Skip to content

Commit

Permalink
WIP: calculate changes hash in an integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Gerace <nick@systeminit.com>
  • Loading branch information
nickgerace committed Dec 18, 2024
1 parent 83cee1c commit e715c78
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 32 deletions.
6 changes: 5 additions & 1 deletion lib/dal/src/change_set/approval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use thiserror::Error;

pub use si_events::ChangeSetApprovalStatus;

use crate::{DalContext, HistoryActor, TransactionsError};
use crate::{DalContext, HistoryActor, TransactionsError, WorkspaceSnapshotError};

pub mod changes_hash;

#[derive(Debug, Error)]
pub enum ChangeSetApprovalError {
Expand All @@ -19,6 +21,8 @@ pub enum ChangeSetApprovalError {
StrumParse(#[from] strum::ParseError),
#[error("transactions error: {0}")]
Transactions(#[from] TransactionsError),
#[error("workspace snapshot error: {0}")]
WorkspaceSnapshot(#[from] WorkspaceSnapshotError),
}

type Result<T> = std::result::Result<T, ChangeSetApprovalError>;
Expand Down
27 changes: 27 additions & 0 deletions lib/dal/src/change_set/approval/changes_hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use si_events::ChangesHash;

use crate::{workspace_snapshot::graph::detector::Change, DalContext, WorkspaceSnapshot};

use super::ChangeSetApprovalError;

type Result<T> = std::result::Result<T, ChangeSetApprovalError>;

pub async fn changes_hash(ctx: &DalContext) -> Result<ChangesHash> {
let changes = detect_changes_from_head(ctx).await?;
let mut hasher = ChangesHash::hasher();
for change in changes {
hasher.update(change.merkle_tree_hash.as_bytes());
}
Ok(hasher.finalize())
}

pub async fn detect_changes_from_head(ctx: &DalContext) -> Result<Vec<Change>> {
let head_change_set_id = ctx.get_workspace_default_change_set_id().await?;
let head_snapshot = WorkspaceSnapshot::find_for_change_set(&ctx, head_change_set_id).await?;
let mut changes = head_snapshot
.detect_changes(&ctx.workspace_snapshot()?.clone())
.await?;

changes.sort_by_key(|c| c.id);
Ok(changes)
}
26 changes: 19 additions & 7 deletions lib/dal/src/workspace_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub mod vector_clock;
pub use traits::{schema::variant::SchemaVariantExt, socket::input::InputSocketExt};

use graph::correct_transforms::correct_transforms;
use graph::detector::Update;
use graph::detector::{Change, Update};
use graph::{RebaseBatch, WorkspaceSnapshotGraph};
use node_weight::traits::CorrectTransformsError;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -713,12 +713,7 @@ impl WorkspaceSnapshot {
Ok(())
}

#[instrument(
name = "workspace_snapshot.detect_updates",
level = "debug",
skip_all,
fields()
)]
#[instrument(name = "workspace_snapshot.detect_updates", level = "debug", skip_all)]
pub async fn detect_updates(
&self,
onto_workspace_snapshot: &WorkspaceSnapshot,
Expand All @@ -735,6 +730,23 @@ impl WorkspaceSnapshot {
.await?)
}

#[instrument(name = "workspace_snapshot.detect_changes", level = "debug", skip_all)]
pub async fn detect_changes(
&self,
onto_workspace_snapshot: &WorkspaceSnapshot,
) -> WorkspaceSnapshotResult<Vec<Change>> {
let self_clone = self.clone();
let onto_clone = onto_workspace_snapshot.clone();

Ok(slow_rt::spawn(async move {
self_clone
.working_copy()
.await
.detect_changes(&*onto_clone.working_copy().await)
})?
.await?)
}

/// Gives the exact node index endpoints of an edge.
pub async fn edge_endpoints(
&self,
Expand Down
25 changes: 17 additions & 8 deletions lib/dal/src/workspace_snapshot/graph/detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use petgraph::{
visit::{Control, DfsEvent},
};
use serde::{Deserialize, Serialize};
use si_events::ulid::Ulid;
use si_events::{merkle_tree_hash::MerkleTreeHash, ulid::Ulid};
use strum::EnumDiscriminants;
use telemetry::prelude::*;

Expand Down Expand Up @@ -36,6 +36,12 @@ pub enum Update {
},
}

#[derive(Debug)]
pub struct Change {
pub id: Ulid,
pub merkle_tree_hash: MerkleTreeHash,
}

#[derive(Clone, Debug)]
enum NodeDifference {
NewNode,
Expand Down Expand Up @@ -82,16 +88,16 @@ impl<'a, 'b> Detector<'a, 'b> {
///
/// This assumes that all graphs involved to not have any "garbage" laying around. If in doubt, perform "cleanup"
/// on both graphs before creating the [`Detector`].
pub fn detect_changes(&self) -> Vec<Ulid> {
let mut ids = Vec::new();
pub fn detect_changes(&self) -> Vec<Change> {
let mut changes = Vec::new();

petgraph::visit::depth_first_search(
self.updated_graph.graph(),
Some(self.updated_graph.root()),
|event| self.calculate_changes_dfs_event(event, &mut ids),
|event| self.calculate_changes_dfs_event(event, &mut changes),
);

ids
changes
}

fn node_diff_from_base_graph(
Expand Down Expand Up @@ -368,7 +374,7 @@ impl<'a, 'b> Detector<'a, 'b> {
fn calculate_changes_dfs_event(
&self,
event: DfsEvent<NodeIndex>,
ids: &mut Vec<Ulid>,
changes: &mut Vec<Change>,
) -> Control<()> {
if let DfsEvent::Discover(updated_graph_index, _) = event {
match self.updated_graph.get_node_weight(updated_graph_index) {
Expand All @@ -380,8 +386,11 @@ impl<'a, 'b> Detector<'a, 'b> {
}

// If either the original node weight was not found or it was found the merkle tree hashes differ,
// then we have a node ID that needs to be collected!
ids.push(updated_node_weight.id());
// then we have information that needs to be collected!
changes.push(Change {
id: updated_node_weight.id(),
merkle_tree_hash: updated_node_weight.merkle_tree_hash(),
});
}
Err(err) => error!(?err, "heat death of the universe error: updated node weight not found by updated node index from the same graph"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod test {
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

#[test]
fn new() -> Result<()> {
fn identical_graphs() -> Result<()> {
let base_graph = WorkspaceSnapshotGraphVCurrent::new_for_unit_tests()?;
let updated_graph = base_graph.clone();
assert!(base_graph.is_acyclic_directed());
Expand Down
4 changes: 3 additions & 1 deletion lib/dal/src/workspace_snapshot/graph/v4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::{
Timestamp,
};

use super::detector::Change;

pub mod component;
pub mod diagram;
pub mod schema;
Expand Down Expand Up @@ -689,7 +691,7 @@ impl WorkspaceSnapshotGraphV4 {
Detector::new(self, updated_graph).detect_updates()
}

pub fn detect_changes(&self, updated_graph: &Self) -> Vec<Ulid> {
pub fn detect_changes(&self, updated_graph: &Self) -> Vec<Change> {
Detector::new(self, updated_graph).detect_changes()
}

Expand Down
32 changes: 28 additions & 4 deletions lib/dal/tests/integration_test/change_set/approval.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
use dal::change_set::approval::{ChangeSetApproval, ChangeSetApprovalStatus};
use dal::DalContext;
use std::collections::HashSet;

use dal::change_set::approval::{self, ChangeSetApproval, ChangeSetApprovalStatus};
use dal::{DalContext, Ulid};
use dal_test::color_eyre::eyre::OptionExt;
use dal_test::helpers::{
create_component_for_default_schema_name_in_default_view, ChangeSetTestHelpers,
};
use dal_test::{test, Result};
use pretty_assertions_sorted::assert_eq;

#[test]
async fn new(ctx: &mut DalContext) -> Result<()> {
create_component_for_default_schema_name_in_default_view(ctx, "fallout", "soken").await?;
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx).await?;

let status = ChangeSetApprovalStatus::Approved;
// FIXME(nick): use a real checksum here.
let checksum = "FIXME".to_string();
let hash = approval::changes_hash::changes_hash(ctx).await?;
let checksum = hash.to_string();

let new_approval = ChangeSetApproval::new(ctx, status, checksum).await?;
assert_eq!(
Expand All @@ -30,3 +38,19 @@ async fn new(ctx: &mut DalContext) -> Result<()> {

Ok(())
}

#[test]
async fn status(ctx: &mut DalContext) -> Result<()> {
create_component_for_default_schema_name_in_default_view(ctx, "fallout", "find the flame")
.await?;
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx).await?;

let changes = approval::changes_hash::detect_changes_from_head(ctx).await?;
let seen: HashSet<Ulid> = HashSet::from_iter(changes.iter().map(|c| c.id));
assert_eq!(
changes.len(), // expected
seen.len() // actual
);

Ok(())
}
15 changes: 5 additions & 10 deletions lib/sdf-server/src/service/v2/change_set/approval_status.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use axum::{
extract::{Host, OriginalUri, Path, State},
Json,
};
use dal::{change_set::approval::ChangeSetApproval, ChangeSet, ChangeSetId, WorkspacePk};
use axum::{extract::Path, Json};
use dal::{change_set::approval::ChangeSetApproval, ChangeSetId, WorkspacePk, WorkspaceSnapshot};

use super::{AppState, Error, Result};
use crate::{
extract::{AccessBuilder, HandlerContext, PosthogClient},
track,
};
use crate::extract::{AccessBuilder, HandlerContext};

use super::Result;

pub async fn approval_status(
HandlerContext(builder): HandlerContext,
Expand Down
4 changes: 4 additions & 0 deletions lib/si-events-rs/src/change_set_approval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ use postgres_types::ToSql;
use serde::{Deserialize, Serialize};
use strum::{AsRefStr, Display, EnumString};

use crate::create_xxhash_type;

create_xxhash_type!(ChangesHash);

#[remain::sorted]
#[derive(
AsRefStr, Deserialize, Serialize, Debug, Display, EnumString, PartialEq, Eq, Copy, Clone, ToSql,
Expand Down
1 change: 1 addition & 0 deletions lib/si-events-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub use crate::{
actor::Actor,
actor::UserPk,
cas::CasValue,
change_set_approval::ChangesHash,
change_set_approval::{ChangeSetApprovalKind, ChangeSetApprovalStatus},
change_set_status::ChangeSetStatus,
content_hash::ContentHash,
Expand Down

0 comments on commit e715c78

Please sign in to comment.