Skip to content

Commit

Permalink
Track snapshot metadata and fail restoring if missing (#2529)
Browse files Browse the repository at this point in the history
* Track snapshot metadata and fail restoring if missing

* Simplify reading via `StreamExt::collect`

* Remove logging output
  • Loading branch information
TimDiekmann authored May 8, 2023
1 parent 3491e4e commit 228a100
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 12 deletions.
19 changes: 17 additions & 2 deletions apps/hash-graph/lib/graph/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod ontology;
mod restore;

use async_trait::async_trait;
use error_stack::{Context, IntoReport, Report, Result, ResultExt};
use error_stack::{ensure, Context, IntoReport, Report, Result, ResultExt};
use futures::{stream, SinkExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use hash_status::StatusCode;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -211,7 +211,7 @@ impl<C: AsClient> SnapshotStore<C> {
) -> Result<(), SnapshotRestoreError> {
tracing::info!("snapshot restore started");

let (snapshot_record_tx, snapshot_record_rx) = restore::channel(chunk_size);
let (snapshot_record_tx, snapshot_record_rx, metadata_rx) = restore::channel(chunk_size);

let read_thread = tokio::spawn(
snapshot
Expand Down Expand Up @@ -278,6 +278,21 @@ impl<C: AsClient> SnapshotStore<C> {
.into_report()
.change_context(SnapshotRestoreError::Read)??;

let mut found_metadata = false;
for metadata in metadata_rx.collect::<Vec<SnapshotMetadata>>().await {
if found_metadata {
tracing::warn!("found more than one metadata record in the snapshot");
}
found_metadata = true;

ensure!(
metadata.block_protocol_module_versions.graph == semver::Version::new(0, 3, 0),
SnapshotRestoreError::Unsupported
);
}

ensure!(found_metadata, SnapshotRestoreError::MissingMetadata);

tracing::info!("snapshot restore finished");

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions apps/hash-graph/lib/graph/src/snapshot/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ impl Error for SnapshotDumpError {}
#[derive(Debug)]
pub enum SnapshotRestoreError {
Unsupported,
MissingMetadata,
Read,
Buffer,
Write,
Expand All @@ -31,6 +32,7 @@ impl fmt::Display for SnapshotRestoreError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Unsupported => write!(f, "The snapshot contains unsupported entries"),
Self::MissingMetadata => write!(f, "The snapshot does not contain metadata"),
Self::Read => write!(f, "could not read a snapshot entry"),
Self::Buffer => write!(f, "could not buffer a snapshot entry"),
Self::Write => write!(f, "could not write a snapshot entry into the store"),
Expand Down
42 changes: 32 additions & 10 deletions apps/hash-graph/lib/graph/src/snapshot/restore/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::{
task::{ready, Context, Poll},
};

use error_stack::{ensure, Report, ResultExt};
use error_stack::{IntoReport, Report, ResultExt};
use futures::{
channel::mpsc::{self, UnboundedReceiver, UnboundedSender},
stream::{select_all, BoxStream, SelectAll},
Sink, SinkExt, Stream, StreamExt,
};
Expand All @@ -15,11 +16,12 @@ use crate::snapshot::{
entity::{self, EntitySender},
ontology::{self, DataTypeSender, EntityTypeSender, PropertyTypeSender},
restore::batch::SnapshotRecordBatch,
SnapshotEntry, SnapshotRestoreError,
SnapshotEntry, SnapshotMetadata, SnapshotRestoreError,
};

#[derive(Debug, Clone)]
pub struct SnapshotRecordSender {
metadata: UnboundedSender<SnapshotMetadata>,
data_type: DataTypeSender,
property_type: PropertyTypeSender,
entity_type: EntityTypeSender,
Expand All @@ -33,6 +35,10 @@ impl Sink<SnapshotEntry> for SnapshotRecordSender {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<StdResult<(), Self::Error>> {
ready!(self.metadata.poll_ready_unpin(cx))
.into_report()
.change_context(SnapshotRestoreError::Read)
.attach_printable("could not poll metadata sender")?;
ready!(self.data_type.poll_ready_unpin(cx))
.attach_printable("could not poll data type sender")?;
ready!(self.property_type.poll_ready_unpin(cx))
Expand All @@ -47,13 +53,12 @@ impl Sink<SnapshotEntry> for SnapshotRecordSender {

fn start_send(mut self: Pin<&mut Self>, entity: SnapshotEntry) -> StdResult<(), Self::Error> {
match entity {
SnapshotEntry::Snapshot(snapshot) => {
ensure!(
snapshot.block_protocol_module_versions.graph == semver::Version::new(0, 3, 0),
SnapshotRestoreError::Unsupported
);
Ok(())
}
SnapshotEntry::Snapshot(snapshot) => self
.metadata
.start_send_unpin(snapshot)
.into_report()
.change_context(SnapshotRestoreError::Read)
.attach_printable("could not send snapshot metadata"),
SnapshotEntry::DataType(data_type) => self
.data_type
.start_send_unpin(data_type)
Expand All @@ -77,6 +82,10 @@ impl Sink<SnapshotEntry> for SnapshotRecordSender {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<StdResult<(), Self::Error>> {
ready!(self.metadata.poll_flush_unpin(cx))
.into_report()
.change_context(SnapshotRestoreError::Read)
.attach_printable("could not flush metadata sender")?;
ready!(self.data_type.poll_flush_unpin(cx))
.attach_printable("could not flush data type sender")?;
ready!(self.property_type.poll_flush_unpin(cx))
Expand All @@ -93,6 +102,10 @@ impl Sink<SnapshotEntry> for SnapshotRecordSender {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<StdResult<(), Self::Error>> {
ready!(self.metadata.poll_close_unpin(cx))
.into_report()
.change_context(SnapshotRestoreError::Read)
.attach_printable("could not close metadata sender")?;
ready!(self.data_type.poll_close_unpin(cx))
.attach_printable("could not close data type sender")?;
ready!(self.property_type.poll_close_unpin(cx))
Expand All @@ -118,7 +131,14 @@ impl Stream for SnapshotRecordReceiver {
}
}

pub fn channel(chunk_size: usize) -> (SnapshotRecordSender, SnapshotRecordReceiver) {
pub fn channel(
chunk_size: usize,
) -> (
SnapshotRecordSender,
SnapshotRecordReceiver,
UnboundedReceiver<SnapshotMetadata>,
) {
let (metadata_tx, metadata_rx) = mpsc::unbounded();
let (account_tx, account_rx) = account::channel(chunk_size);
let (ontology_metadata_tx, ontology_metadata_rx) =
ontology::ontology_metadata_channel(chunk_size, account_tx.clone());
Expand All @@ -132,6 +152,7 @@ pub fn channel(chunk_size: usize) -> (SnapshotRecordSender, SnapshotRecordReceiv

(
SnapshotRecordSender {
metadata: metadata_tx,
data_type: data_type_tx,
property_type: property_type_tx,
entity_type: entity_type_tx,
Expand All @@ -151,5 +172,6 @@ pub fn channel(chunk_size: usize) -> (SnapshotRecordSender, SnapshotRecordReceiv
entity_rx.map(SnapshotRecordBatch::Entities).boxed(),
]),
},
metadata_rx,
)
}

0 comments on commit 228a100

Please sign in to comment.