Skip to content

Commit

Permalink
Deterministic ordering of competing messages (#506)
Browse files Browse the repository at this point in the history
* implement milestone sorting

* Add DIDNotFound error variant

Co-authored-by: Craig Bester <craig.bester@iota.org>
  • Loading branch information
abdulmth and cycraig authored Nov 29, 2021
1 parent 0b72612 commit 59d1bca
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 60 deletions.
1 change: 1 addition & 0 deletions identity-iota/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ futures = { version = "0.3" }
identity-core = { version = "=0.4.0", path = "../identity-core" }
identity-credential = { version = "=0.4.0", path = "../identity-credential" }
identity-did = { version = "=0.4.0", path = "../identity-did" }
itertools = { version = "0.10" }
lazy_static = { version = "1.4", default-features = false }
log = { version = "0.4", default-features = false }
num-derive = { version = "0.3", default-features = false }
Expand Down
66 changes: 34 additions & 32 deletions identity-iota/src/chain/diff_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ use serde::Serialize;

use identity_core::convert::ToJson;

use crate::chain::milestone::sort_by_milestone;
use crate::chain::IntegrationChain;
use crate::did::IotaDID;
use crate::document::DiffMessage;
use crate::document::IotaDocument;
use crate::error::Error;
use crate::error::Result;
use crate::tangle::Client;
use crate::tangle::Message;
use crate::tangle::MessageExt;
use crate::tangle::MessageId;
Expand All @@ -34,7 +36,11 @@ pub struct DiffChain {

impl DiffChain {
/// Constructs a new [`DiffChain`] for the given [`IntegrationChain`] from a slice of [`Messages`][Message].
pub fn try_from_messages(integration_chain: &IntegrationChain, messages: &[Message]) -> Result<Self> {
pub async fn try_from_messages(
integration_chain: &IntegrationChain,
messages: &[Message],
client: &Client,
) -> Result<Self> {
let did: &IotaDID = integration_chain.current().id();

let index: MessageIndex<DiffMessage> = messages
Expand All @@ -44,36 +50,50 @@ impl DiffChain {

log::debug!("[Diff] Valid Messages = {}/{}", messages.len(), index.len());

Self::try_from_index(integration_chain, index)
Ok(Self::try_from_index(integration_chain, index, client).await?)
}

/// Constructs a new [`DiffChain`] for the given [`IntegrationChain`] from the given [`MessageIndex`].
pub fn try_from_index(integration_chain: &IntegrationChain, index: MessageIndex<DiffMessage>) -> Result<Self> {
pub async fn try_from_index(
integration_chain: &IntegrationChain,
index: MessageIndex<DiffMessage>,
client: &Client,
) -> Result<Self> {
log::trace!("[Diff] Message Index = {:#?}", index);
Self::try_from_index_with_document(integration_chain.current(), index)
Self::try_from_index_with_document(integration_chain.current(), index, client).await
}

/// Constructs a new [`DiffChain`] from the given [`MessageIndex`], using an integration document
/// to validate.
pub(in crate::chain) fn try_from_index_with_document(
pub(in crate::chain) async fn try_from_index_with_document(
integration_document: &IotaDocument,
mut index: MessageIndex<DiffMessage>,
client: &Client,
) -> Result<Self> {
if index.is_empty() {
return Ok(Self::new());
}

let mut this: Self = Self::new();
while let Some(mut list) = index.remove(
while let Some(diffs) = index.remove(
this
.current_message_id()
.unwrap_or_else(|| integration_document.message_id()),
) {
'inner: while let Some(next_diff) = list.pop() {
if this.try_push_inner(next_diff, integration_document).is_ok() {
break 'inner;
}
// Extract valid diffs.
let expected_prev_message_id: &MessageId = this
.current_message_id()
.unwrap_or_else(|| integration_document.message_id());
let valid_diffs: Vec<DiffMessage> = diffs
.into_iter()
.filter(|diff| Self::check_valid_addition(diff, integration_document, expected_prev_message_id).is_ok())
.collect();

// Sort and push the diff referenced by the oldest milestone.
if let Some(next) = sort_by_milestone(valid_diffs, client).await?.into_iter().next() {
this.push_unchecked(next); // checked by check_valid_addition above
}
// If no diff is appended, the chain ends.
}

Ok(this)
Expand Down Expand Up @@ -117,34 +137,16 @@ impl DiffChain {
/// references within the diff are invalid.
pub fn try_push(&mut self, diff: DiffMessage, integration_chain: &IntegrationChain) -> Result<()> {
let document: &IotaDocument = integration_chain.current();
self.try_push_inner(diff, document)
}

/// Adds a new diff to the [`DiffChain`].
///
/// # Errors
///
/// Fails if the diff signature is invalid or the Tangle message
/// references within the diff are invalid.
fn try_push_inner(&mut self, diff: DiffMessage, document: &IotaDocument) -> Result<()> {
let expected_prev_message_id: &MessageId = self.current_message_id().unwrap_or_else(|| document.message_id());
Self::check_valid_addition(&diff, document, expected_prev_message_id)?;

// SAFETY: we performed the necessary validation in `check_validity`.
unsafe {
self.push_unchecked(diff);
}
self.push_unchecked(diff);

Ok(())
}

/// Adds a new diff to the [`DiffChain`] with performing validation checks.
///
/// # Safety
///
/// This function is unsafe because it does not check the validity of
/// the signature or Tangle references of the [`DiffMessage`].
pub unsafe fn push_unchecked(&mut self, diff: DiffMessage) {
/// Adds a new diff to the [`DiffChain`] without performing validation checks on the signature or Tangle references
/// of the [`DiffMessage`].
fn push_unchecked(&mut self, diff: DiffMessage) {
self.inner.push(diff);
}

Expand Down
8 changes: 4 additions & 4 deletions identity-iota/src/chain/document_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ impl DocumentHistory {
pub async fn read(client: &Client, did: &IotaDID) -> Result<Self> {
// Fetch and parse the integration chain
let integration_messages: Vec<Message> = client.read_messages(did.tag()).await?;
let integration_chain = IntegrationChain::try_from_messages(did, &integration_messages)?;
let integration_chain = IntegrationChain::try_from_messages(did, &integration_messages, client).await?;

// Fetch and parse the diff chain for the last integration message
let diff_index: String = IotaDocument::diff_index(integration_chain.current_message_id())?;
let diff_messages: Vec<Message> = client.read_messages(&diff_index).await?;
let diff_chain: DiffChain = DiffChain::try_from_messages(&integration_chain, &diff_messages)?;
let diff_chain: DiffChain = DiffChain::try_from_messages(&integration_chain, &diff_messages, client).await?;

let integration_chain_history: ChainHistory<IotaDocument> =
ChainHistory::from((integration_chain, integration_messages.deref()));
Expand Down Expand Up @@ -100,14 +100,14 @@ impl ChainHistory<DiffMessage> {
///
/// This is useful for constructing histories of old diff chains no longer at the end of an
/// integration chain.
pub fn try_from_raw_messages(document: &IotaDocument, messages: &[Message]) -> Result<Self> {
pub async fn try_from_raw_messages(document: &IotaDocument, messages: &[Message], client: &Client) -> Result<Self> {
let did: &IotaDID = document.did();
let index: MessageIndex<DiffMessage> = messages
.iter()
.flat_map(|message| message.try_extract_diff(did))
.collect();

let diff_chain = DiffChain::try_from_index_with_document(document, index)?;
let diff_chain = DiffChain::try_from_index_with_document(document, index, client).await?;
Ok(Self::from((diff_chain, messages)))
}
}
Expand Down
64 changes: 43 additions & 21 deletions identity-iota/src/chain/integration_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ use serde::Serialize;

use identity_core::convert::ToJson;

use crate::chain::milestone::sort_by_milestone;
use crate::did::IotaDID;
use crate::document::IotaDocument;
use crate::error::Error;
use crate::error::Result;
use crate::tangle::Client;
use crate::tangle::Message;
use crate::tangle::MessageExt;
use crate::tangle::MessageId;
Expand All @@ -37,40 +39,56 @@ pub struct IntegrationChain {

impl IntegrationChain {
/// Constructs a new [`IntegrationChain`] from a slice of [`Message`]s.
pub fn try_from_messages(did: &IotaDID, messages: &[Message]) -> Result<Self> {
pub async fn try_from_messages(did: &IotaDID, messages: &[Message], client: &Client) -> Result<Self> {
let index: MessageIndex<IotaDocument> = messages
.iter()
.flat_map(|message| message.try_extract_document(did))
.collect();

log::debug!("[Int] Valid Messages = {}/{}", messages.len(), index.len());

Self::try_from_index(index)
Self::try_from_index(index, client).await
}

/// Constructs a new [`IntegrationChain`] from the given [`MessageIndex`].
pub fn try_from_index(mut index: MessageIndex<IotaDocument>) -> Result<Self> {
pub async fn try_from_index(mut index: MessageIndex<IotaDocument>, client: &Client) -> Result<Self> {
log::trace!("[Int] Message Index = {:#?}", index);

// Extract root document.
let current: IotaDocument = index
.remove_where(&MessageId::null(), |doc| {
IotaDocument::verify_root_document(doc).is_ok()
})
.ok_or(Error::ChainError {
error: "Invalid Root Document",
})?;

// Construct the document chain.
let mut this: Self = Self::new(current)?;
while let Some(mut list) = index.remove(this.current_message_id()) {
'inner: while let Some(document) = list.pop() {
if this.try_push(document).is_ok() {
break 'inner;
}
let root_document: IotaDocument = {
let valid_root_documents: Vec<IotaDocument> = index
.remove(&MessageId::null())
.ok_or(Error::DIDNotFound("DID not found or pruned"))?
.into_iter()
.filter(|doc| IotaDocument::verify_root_document(doc).is_ok())
.collect();

if valid_root_documents.is_empty() {
return Err(Error::DIDNotFound("no valid root document found"));
}
}

let sorted_root_documents: Vec<IotaDocument> = sort_by_milestone(valid_root_documents, client).await?;
sorted_root_documents
.into_iter()
.next()
.ok_or(Error::DIDNotFound("no root document confirmed by a milestone found"))?
};

// Construct the rest of the integration chain.
let mut this: Self = Self::new(root_document)?;
while let Some(documents) = index.remove(this.current_message_id()) {
// Extract valid documents.
let valid_documents: Vec<IotaDocument> = documents
.into_iter()
.filter(|document| this.check_valid_addition(document).is_ok())
.collect();

// Sort and push the one referenced by the oldest milestone.
if let Some(next) = sort_by_milestone(valid_documents, client).await?.into_iter().next() {
this.push_unchecked(next); // checked above
}
// If no document is appended, the chain ends.
}
Ok(this)
}

Expand Down Expand Up @@ -120,13 +138,17 @@ impl IntegrationChain {
/// See [`IntegrationChain::check_valid_addition`].
pub fn try_push(&mut self, document: IotaDocument) -> Result<()> {
self.check_valid_addition(&document)?;
self.push_unchecked(document);

Ok(())
}

/// Adds a new [`IotaDocument`] to this [`IntegrationChain`] without validating it.
fn push_unchecked(&mut self, document: IotaDocument) {
self
.history
.get_or_insert_with(Vec::new)
.push(mem::replace(&mut self.current, document));

Ok(())
}

/// Returns `true` if the [`IotaDocument`] can be added to this [`IntegrationChain`].
Expand Down
112 changes: 112 additions & 0 deletions identity-iota/src/chain/milestone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2020-2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::cmp::Ordering;

use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use itertools::Itertools;

use crate::error::Result;
use crate::tangle::Client;
use crate::tangle::TangleRef;

/// Fetches the milestones of messages and sorts them in ascending order of the milestone
/// index that references them. Messages not referenced by a milestone are filtered out.
/// If multiple messages are referenced by the same milestone, they will be sorted
/// by [`MessageId`](crate::tangle::MessageId).
///
/// NOTE: this will NOT fetch milestones if only one message is present.
///
/// # Errors
///
/// [`ClientError`](crate::error::Error::ClientError) if fetching a milestone fails.
pub(crate) async fn sort_by_milestone<T: TangleRef>(messages: Vec<T>, client: &Client) -> Result<Vec<T>> {
if messages.len() == 1 || messages.is_empty() {
return Ok(messages);
}

// Fetch metadata from the Tangle.
let milestones: Vec<(Option<u32>, T)> = messages
.into_iter()
.map(|message| async {
client
.client
.get_message()
.metadata(message.message_id())
.await
.map(|metadata| (metadata.referenced_by_milestone_index, message))
})
.collect::<FuturesUnordered<_>>()
.try_collect()
.await?;

let sorted: Vec<T> = sort_by_milestone_index(milestones);
Ok(sorted)
}

/// Sort by milestone index in ascending order, breaking ties by `message_id`.
fn sort_by_milestone_index<T: TangleRef>(messages_milestones: Vec<(Option<u32>, T)>) -> Vec<T> {
messages_milestones
.into_iter()
.filter_map(|(milestone_index, message)|
// Ignore any messages not referenced by a milestone.
milestone_index.map(|index| (index, message)))
.sorted_unstable_by(|(a_milestone, a_message), (b_milestone, b_message)| {
let milestone_cmp: Ordering = a_milestone.cmp(b_milestone);
if milestone_cmp == Ordering::Equal {
// Sort by message_id when both are referenced by the same milestone.
a_message.message_id().cmp(b_message.message_id())
} else {
milestone_cmp
}
})
.map(|(_, message)| message)
.collect()
}

#[cfg(test)]
mod tests {
use crate::did::IotaDID;
use crate::tangle::MessageId;

use super::*;

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct FakeTangleRef(MessageId);

impl TangleRef for FakeTangleRef {
fn did(&self) -> &IotaDID {
unimplemented!()
}

fn message_id(&self) -> &MessageId {
&self.0
}

fn set_message_id(&mut self, _message_id: MessageId) {
unimplemented!()
}

fn previous_message_id(&self) -> &MessageId {
unimplemented!()
}

fn set_previous_message_id(&mut self, _message_id: MessageId) {
unimplemented!()
}
}

#[test]
fn test_sort_by_milestone_index() {
let m0 = FakeTangleRef(MessageId::new([0_u8; 32]));
let m1 = FakeTangleRef(MessageId::new([1_u8; 32]));
let m2 = FakeTangleRef(MessageId::new([2_u8; 32]));
let m3 = FakeTangleRef(MessageId::new([3_u8; 32]));
let m4 = FakeTangleRef(MessageId::new([4_u8; 32]));

let unsorted = vec![(Some(1), m4), (Some(1), m0), (Some(0), m1), (None, m2), (Some(0), m3)];
let sorted = sort_by_milestone_index(unsorted);
assert_eq!(sorted, vec![m1, m3, m0, m4]);
}
}
1 change: 1 addition & 0 deletions identity-iota/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ mod diff_chain;
mod document_chain;
mod document_history;
mod integration_chain;
mod milestone;
Loading

0 comments on commit 59d1bca

Please sign in to comment.