Skip to content

Commit

Permalink
feat(rust): provide async implementations of blocking entity and vaul…
Browse files Browse the repository at this point in the history
…t functions
  • Loading branch information
antoinevg committed Sep 7, 2021
1 parent a8ced80 commit 0569f85
Show file tree
Hide file tree
Showing 71 changed files with 2,102 additions and 194 deletions.
35 changes: 35 additions & 0 deletions examples/rust/get_started/examples/hello_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use ockam::{route, Context, Entity, Result, SecureChannels, TrustEveryonePolicy, Vault};

#[ockam::node]
async fn main(mut ctx: Context) -> Result<()> {
// Create a Vault to safely store secret keys for Alice and Bob.
let vault = Vault::async_create(&ctx).await?;

// Create an Entity to represent Bob.
let mut bob = Entity::async_create(&ctx, &vault).await?;

// Create a secure channel listener for Bob that will wait for requests to
// initiate an Authenticated Key Exchange.
bob.async_create_secure_channel_listener("bob", TrustEveryonePolicy).await?;

// Create an entity to represent Alice.
let mut alice = Entity::async_create(&ctx, &vault).await?;

// As Alice, connect to Bob's secure channel listener and perform an
// Authenticated Key Exchange to establish an encrypted secure channel with Bob.
let channel = alice.async_create_secure_channel("bob", TrustEveryonePolicy).await?;

// Send a message, ** THROUGH ** the secure channel,
// to the "app" worker on the other side.
//
// This message will automatically get encrypted when it enters the channel
// and decrypted just before it exits the channel.
ctx.send(route![channel, "app"], "Hello Ockam!".to_string()).await?;

// Wait to receive a message for the "app" worker and print it.
let message = ctx.receive::<String>().await?;
println!("App Received: {}", message); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
ctx.stop().await
}
1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam/src/stream/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ockam_core::compat::vec::Vec;
use crate::{
protocols::{ParserFragment, ProtocolPayload},
Address, Any, Context, Message, ProtocolId, Result, Route, Routed, TransportMessage, Worker,
Expand Down
1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam/src/stream/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
stream::{StreamCmdParser, StreamWorkerCmd},
DelayedEvent, Message, TransportMessage,
};
use ockam_core::compat::{boxed::Box, string::String, vec::Vec};
use crate::{Address, Any, Context, Result, Route, Routed, Worker};
use core::time::Duration;
use ockam_core::compat::{boxed::Box, string::String, vec::Vec};
Expand Down
13 changes: 7 additions & 6 deletions implementations/rust/ockam/ockam_channel/src/secure_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ use crate::{
KeyExchangeCompleted, SecureChannelListener, SecureChannelNewKeyExchanger, SecureChannelVault,
SecureChannelWorker,
};
#[cfg(not(feature = "std"))]
use ockam_core::compat::rand::random;
use ockam_core::{Address, Result, Route};
use ockam_key_exchange_core::KeyExchanger;
use ockam_node::Context;
#[cfg(feature = "std")]
use rand::random;
use serde::{Deserialize, Serialize};
use tracing::{debug, info};

#[cfg(not(feature = "std"))]
use ockam_core::compat::rand::random;
#[cfg(feature = "std")]
use rand::random;

/// SecureChannel info returned from start_initiator_channel
/// Auth hash can be used for further authentication of the channel
/// and tying it up cryptographically to some source of Trust. (e.g. Entities)
Expand Down Expand Up @@ -53,8 +54,8 @@ impl SecureChannel {
/// Create and start channel listener with given address.
pub async fn create_listener_extended<
A: Into<Address>,
N: SecureChannelNewKeyExchanger,
V: SecureChannelVault,
N: SecureChannelNewKeyExchanger + Sync,
V: SecureChannelVault + Sync,
>(
ctx: &Context,
address: A,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl CreateResponderChannelMessage {
}

#[async_trait]
impl<V: SecureChannelVault, N: SecureChannelNewKeyExchanger> Worker
impl<V: SecureChannelVault + Sync, N: SecureChannelNewKeyExchanger + Sync> Worker
for SecureChannelListener<V, N>
{
type Message = CreateResponderChannelMessage;
Expand Down Expand Up @@ -86,8 +86,8 @@ impl<V: SecureChannelVault, N: SecureChannelNewKeyExchanger> Worker
address_local.clone(),
msg.completed_callback_address().clone(),
None,
self.new_key_exchanger.responder()?,
self.vault.clone(),
self.new_key_exchanger.async_responder().await?,
self.vault.async_clone().await,
)?;

ctx.start_worker(vec![address_remote.clone(), address_local], channel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ impl<V: SecureChannelVault, K: KeyExchanger + Send + 'static> SecureChannelWorke

let (small_nonce, nonce) = Self::convert_nonce_u16(nonce);

let mut cipher_text = self.vault.aead_aes_gcm_encrypt(
let mut cipher_text = self.vault.async_aead_aes_gcm_encrypt(
&keys.encrypt_key,
payload.as_slice(),
&nonce,
&[],
)?;
).await?;

let mut res = Vec::new();
res.extend_from_slice(&small_nonce);
Expand Down Expand Up @@ -187,7 +187,7 @@ impl<V: SecureChannelVault, K: KeyExchanger + Send + 'static> SecureChannelWorke
let nonce = Self::convert_nonce_small(&payload.as_slice()[..2])?;

self.vault
.aead_aes_gcm_decrypt(&keys.decrypt_key, &payload[2..], &nonce, &[])?
.async_aead_aes_gcm_decrypt(&keys.decrypt_key, &payload[2..], &nonce, &[]).await?
};

let mut transport_message = TransportMessage::decode(&payload)?;
Expand Down Expand Up @@ -226,10 +226,10 @@ impl<V: SecureChannelVault, K: KeyExchanger + Send + 'static> SecureChannelWorke
} else {
return Err(SecureChannelError::InvalidInternalState.into());
}
let _ = key_exchanger.handle_response(payload.as_slice())?;
let _ = key_exchanger.async_handle_response(payload.as_slice()).await?;

if !key_exchanger.is_complete() {
let payload = key_exchanger.generate_request(&[])?;
let payload = key_exchanger.async_generate_request(&[]).await?;
self.send_key_exchange_payload(ctx, payload, false).await?;
}

Expand All @@ -246,7 +246,7 @@ impl<V: SecureChannelVault, K: KeyExchanger + Send + 'static> SecureChannelWorke
} else {
return Err(SecureChannelError::InvalidInternalState.into());
}
let keys = key_exchanger.finalize()?;
let keys = key_exchanger.async_finalize().await?;

self.keys = Some(ChannelKeys {
encrypt_key: keys.encrypt_key().clone(),
Expand Down Expand Up @@ -309,7 +309,7 @@ impl<V: SecureChannelVault, K: KeyExchanger + Send + 'static> Worker for SecureC
async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> {
if self.is_initiator {
if let Some(initiator) = self.key_exchanger.as_mut() {
let payload = initiator.generate_request(&[])?;
let payload = initiator.async_generate_request(&[]).await?;

self.send_key_exchange_payload(ctx, payload, true).await?;
} else {
Expand Down
5 changes: 3 additions & 2 deletions implementations/rust/ockam/ockam_channel/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use ockam_core::traits::AsyncClone;
use ockam_key_exchange_core::{KeyExchanger, NewKeyExchanger};
use ockam_vault_core::SymmetricVault;

/// Vault with XX required functionality
pub trait SecureChannelVault: SymmetricVault + Clone + Send + 'static {}
pub trait SecureChannelVault: SymmetricVault + AsyncClone + Clone + Send + 'static {}

impl<D> SecureChannelVault for D where D: SymmetricVault + Clone + Send + 'static {}
impl<D> SecureChannelVault for D where D: SymmetricVault + AsyncClone + Clone + Send + 'static {}

/// Vault with XX required functionality
pub trait SecureChannelKeyExchanger: KeyExchanger + Send + 'static {}
Expand Down
12 changes: 12 additions & 0 deletions implementations/rust/ockam/ockam_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ mod error;
mod message;
mod processor;
mod routing;
/// traits
pub mod traits {
use crate::compat::boxed::Box;
use async_trait::async_trait;

#[async_trait]
/// Async version of the Clone trait
pub trait AsyncClone: Sized {
/// Returns a copy of the value.
async fn async_clone(&self) -> Self;
}
}
mod worker;

pub use error::*;
Expand Down
24 changes: 24 additions & 0 deletions implementations/rust/ockam/ockam_entity/src/authentication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ impl Authentication {
serde_bare::to_vec(&proof).map_err(|_| EntityError::BareError.into())
}

pub(crate) async fn async_generate_proof<V: ProfileVault>(
channel_state: &[u8],
secret: &Secret,
vault: &mut V,
) -> ockam_core::Result<Vec<u8>> {
let signature = vault.async_sign(secret, channel_state).await?;

let proof = AuthenticationProof::new(signature);

serde_bare::to_vec(&proof).map_err(|_| EntityError::BareError.into())
}

pub(crate) fn verify_proof<V: ProfileVault>(
channel_state: &[u8],
responder_public_key: &PublicKey,
Expand All @@ -47,6 +59,18 @@ impl Authentication {

vault.verify(proof.signature(), responder_public_key, channel_state)
}

pub(crate) async fn async_verify_proof<V: ProfileVault>(
channel_state: &[u8],
responder_public_key: &PublicKey,
proof: &[u8],
vault: &mut V,
) -> ockam_core::Result<bool> {
let proof: AuthenticationProof =
serde_bare::from_slice(proof).map_err(|_| EntityError::BareError)?;

vault.async_verify(proof.signature(), responder_public_key, channel_state).await
}
}

#[cfg(test)]
Expand Down
98 changes: 98 additions & 0 deletions implementations/rust/ockam/ockam_entity/src/change_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,21 @@ impl ProfileChangeHistory {
}
allow()
}

pub(crate) async fn async_verify_all_existing_events(
&self,
vault: &mut impl ProfileVault,
) -> ockam_core::Result<bool> {
for i in 0..self.0.len() {
let existing_events = &self.as_ref()[..i];
let new_event = &self.as_ref()[i];
if !Self::async_verify_event(existing_events, new_event, vault).await? {
return deny();
}
}
allow()
}

/// WARNING: This function assumes all existing events in chain are verified.
/// WARNING: Correctness of events sequence is not verified here.
pub(crate) fn verify_event(
Expand Down Expand Up @@ -256,6 +271,89 @@ impl ProfileChangeHistory {
allow()
}

/// WARNING: This function assumes all existing events in chain are verified.
/// WARNING: Correctness of events sequence is not verified here.
pub(crate) async fn async_verify_event(
existing_events: &[ProfileChangeEvent],
new_change_event: &ProfileChangeEvent,
vault: &mut impl ProfileVault,
) -> ockam_core::Result<bool> {
let changes = new_change_event.changes();
let changes_binary = serde_bare::to_vec(&changes).map_err(|_| EntityError::BareError)?;

let event_id = vault.async_sha256(&changes_binary).await?;
let event_id = EventIdentifier::from_hash(event_id);

if &event_id != new_change_event.identifier() {
return deny(); // EventIdDoesntMatch
}

match new_change_event.proof() {
ProfileChangeProof::Signature(s) => match s.stype() {
SignatureType::RootSign => {
let events_to_look = if existing_events.is_empty() {
core::slice::from_ref(new_change_event)
} else {
existing_events
};
let root_public_key =
Self::get_current_profile_update_public_key(events_to_look)?;
if !vault.async_verify(s.data(), &root_public_key, event_id.as_ref()).await? {
return deny();
}
}
},
}

for change in new_change_event.changes().data() {
if !match change.change_type() {
CreateKey(c) => {
// Should have 1 self signature
let data_binary =
serde_bare::to_vec(c.data()).map_err(|_| EntityError::BareError)?;
let data_hash = vault.async_sha256(data_binary.as_slice()).await?;

// if verification failed, there is no channel back. Return bool msg?
vault.async_verify(
c.self_signature(),
&PublicKey::new(c.data().public_key().into()),
&data_hash,
).await?
}
RotateKey(c) => {
// Should have 1 self signature and 1 prev signature
let data_binary =
serde_bare::to_vec(c.data()).map_err(|_| EntityError::BareError)?;
let data_hash = vault.sha256(data_binary.as_slice())?;

if !vault.async_verify(
c.self_signature(),
&PublicKey::new(c.data().public_key().into()),
&data_hash,
).await? {
false
} else {
let prev_key_event =
Self::find_last_key_event(existing_events, c.data().key_attributes())?;
let prev_key_change = ProfileChangeHistory::find_key_change_in_event(
prev_key_event,
c.data().key_attributes(),
)
.ok_or(EntityError::InvalidInternalState)?;
let public_key =
ProfileChangeHistory::get_change_public_key(prev_key_change)?;

vault.async_verify(c.prev_signature(), &public_key, &data_hash).await?
}
}
} {
return Err(EntityError::VerifyFailed.into());
}
}

allow()
}

/// Check consistency of events that are been added
pub(crate) fn check_consistency(
existing_events: &[ProfileChangeEvent],
Expand Down
15 changes: 8 additions & 7 deletions implementations/rust/ockam/ockam_entity/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{Identity, ProfileIdentifier};
use async_trait::async_trait;
use ockam_core::compat::boxed::Box;
use ockam_core::traits::AsyncClone;
use ockam_core::{Address, Message, Result, Route, Routed};
use ockam_node::Context;
use ockam_vault_sync_core::VaultSync;
Expand All @@ -22,30 +23,30 @@ pub trait SecureChannelTrait {
self,
ctx: &Context,
route: Route,
trust_policy: impl TrustPolicy,
trust_policy: impl TrustPolicy + Sync,
vault: &Address,
) -> Result<Address>;

async fn create_secure_channel_listener_async(
self,
ctx: &Context,
address: Address,
trust_policy: impl TrustPolicy,
trust_policy: impl TrustPolicy + Sync,
vault: &Address,
) -> Result<()>;
}

#[async_trait]
impl<P: Identity + Send + Clone> SecureChannelTrait for P {
impl<P: Identity + Send + AsyncClone + Clone + Sync> SecureChannelTrait for P {
/// Create mutually authenticated secure channel
async fn create_secure_channel_async(
self,
ctx: &Context,
route: Route,
trust_policy: impl TrustPolicy,
trust_policy: impl TrustPolicy + Sync,
vault: &Address,
) -> Result<Address> {
let vault = VaultSync::create_with_worker(ctx, vault)?;
let vault = VaultSync::async_create_with_worker(ctx, vault).await?;
SecureChannelWorker::create_initiator(ctx, route, self, trust_policy, vault).await
}

Expand All @@ -54,10 +55,10 @@ impl<P: Identity + Send + Clone> SecureChannelTrait for P {
self,
ctx: &Context,
address: Address,
trust_policy: impl TrustPolicy,
trust_policy: impl TrustPolicy + Sync,
vault: &Address,
) -> Result<()> {
let vault = VaultSync::create_with_worker(ctx, vault)?;
let vault = VaultSync::async_create_with_worker(ctx, vault).await?;
let listener = ProfileChannelListener::new(trust_policy, self, vault);
ctx.start_worker(address, listener).await
}
Expand Down
Loading

0 comments on commit 0569f85

Please sign in to comment.