Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reshare Megolm session after the other party unwedges #3604

Merged
merged 13 commits into from
Jun 28, 2024
2 changes: 1 addition & 1 deletion crates/matrix-sdk-crypto/src/gossiping/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ impl GossipMachine {
// information is recorded there.
} else if let Some(outbound) = outbound_session {
match outbound.is_shared_with(&device.inner) {
ShareState::Shared(message_index) => Ok(Some(message_index)),
ShareState::Shared(message_index, _) => Ok(Some(message_index)),
ShareState::SharedButChangedSenderKey => Err(KeyForwardDecision::ChangedSenderKey),
ShareState::NotShared => Err(KeyForwardDecision::OutboundSessionNotShared),
}
Expand Down
12 changes: 11 additions & 1 deletion crates/matrix-sdk-crypto/src/identities/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ pub struct ReadOnlyDevice {
/// Default to epoch for migration purpose.
#[serde(default = "default_timestamp")]
first_time_seen_ts: MilliSecondsSinceUnixEpoch,
/// The number of times the device has tried to unwedge Olm sessions with
/// us.
#[serde(default)]
pub(crate) olm_wedging_index: u32,
}

fn default_timestamp() -> MilliSecondsSinceUnixEpoch {
Expand Down Expand Up @@ -580,6 +584,7 @@ impl ReadOnlyDevice {
deleted: Arc::new(AtomicBool::new(false)),
withheld_code_sent: Arc::new(AtomicBool::new(false)),
first_time_seen_ts: MilliSecondsSinceUnixEpoch::now(),
olm_wedging_index: 0,
}
}

Expand Down Expand Up @@ -833,7 +838,11 @@ impl ReadOnlyDevice {

match self.encrypt(store, event_type, content).await {
Ok((session, encrypted)) => Ok(MaybeEncryptedRoomKey::Encrypted {
share_info: ShareInfo::new_shared(session.sender_key().to_owned(), message_index),
share_info: ShareInfo::new_shared(
session.sender_key().to_owned(),
message_index,
self.olm_wedging_index,
),
used_session: session,
message: encrypted.cast(),
}),
Expand Down Expand Up @@ -970,6 +979,7 @@ impl TryFrom<&DeviceKeys> for ReadOnlyDevice {
trust_state: Arc::new(RwLock::new(LocalTrust::Unset)),
withheld_code_sent: Arc::new(AtomicBool::new(false)),
first_time_seen_ts: MilliSecondsSinceUnixEpoch::now(),
olm_wedging_index: 0,
};

device.verify_device_keys(device_keys)?;
Expand Down
24 changes: 17 additions & 7 deletions crates/matrix-sdk-crypto/src/olm/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::{
error::{EventError, OlmResult, SessionCreationError},
identities::ReadOnlyDevice,
requests::UploadSigningKeysRequest,
store::{Changes, Store},
store::{Changes, DeviceChanges, Store},
types::{
events::{
olm_v1::AnyDecryptedOlmEvent,
Expand Down Expand Up @@ -1305,12 +1305,22 @@ impl Account {
// we might try to create the same session again.
// TODO: separate the session cache from the storage so we only add
// it to the cache but don't store it.
store
.save_changes(Changes {
sessions: vec![result.session.clone()],
..Default::default()
})
.await?;
let mut changes =
Changes { sessions: vec![result.session.clone()], ..Default::default() };
uhoreg marked this conversation as resolved.
Show resolved Hide resolved
// Any new Olm session will bump the Olm wedging index for the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be also wrapped manually, though it doesn't look as comical as the other doc comment, wouldn't hurt to let cargo fmt format this as well.

// sender's device, if we have their device, which will cause
// us to re-send existing Megolm sessions to them the next time
// we use the session. If we don't have their device, this
// means that we haven't tried to send them any Megolm sessions
// yet, so we don't need to worry about it.
if let Some(device) = store.get_device_from_curve_key(sender, sender_key).await? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is clustered together as well, please add some whitespace to make this easier to read.

let mut read_only_device = device.inner;
read_only_device.olm_wedging_index += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this can panic, what do you think about using our SequenceNumber type for this. Not that it's terribly important but since we have such a type, seems like we could just use it for correctness sake:

/// A numeric type that can represent an infinite ordered sequence.
///
/// It uses wrapping arithmetic to make sure we never run out of numbers. (2**64
/// should be enough for anyone, but it's easy enough just to make it wrap.)
//
/// Internally it uses a *signed* counter so that we can compare values via a
/// subtraction. For example, suppose we've just overflowed from i64::MAX to
/// i64::MIN. (i64::MAX.wrapping_sub(i64::MIN)) is -1, which tells us that
/// i64::MAX comes before i64::MIN in the sequence.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) struct SequenceNumber(i64);
impl Display for SequenceNumber {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl PartialOrd for SequenceNumber {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.0.wrapping_sub(other.0).cmp(&0))
}
}
impl Ord for SequenceNumber {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.wrapping_sub(other.0).cmp(&0)
}
}
impl SequenceNumber {
fn increment(&mut self) {
self.0 = self.0.wrapping_add(1)
}
fn previous(&self) -> Self {
Self(self.0.wrapping_sub(1))
}
}

You would need to derive Serialize and Deserialize for it with the transparent attribute, but that's about it.

debug!(read_only_device.olm_wedging_index, "Olm wedging index");
changes.devices =
DeviceChanges { changed: vec![read_only_device], ..Default::default() };
}
store.save_changes(changes).await?;

Ok((SessionType::New(result.session), result.plaintext))
}
Expand Down
21 changes: 14 additions & 7 deletions crates/matrix-sdk-crypto/src/olm/group_sessions/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ const ROTATION_MESSAGES: u64 = 100;
pub(crate) enum ShareState {
NotShared,
SharedButChangedSenderKey,
Shared(u32),
Shared(u32, u32),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could take some docs, a single u32 could be figured out relatively easily, two of them double the confusion. It probably makes sense to convert the enum variant from a tuple variant to one with named fields, i.e.:

Shared {
    megolm_index: u32,
    wedging_foo: u32,
}

}

/// Settings for an encrypted room.
Expand Down Expand Up @@ -168,8 +168,12 @@ pub enum ShareInfo {

impl ShareInfo {
/// Helper to create a SharedWith info
pub fn new_shared(sender_key: Curve25519PublicKey, message_index: u32) -> Self {
ShareInfo::Shared(SharedWith { sender_key, message_index })
pub fn new_shared(
sender_key: Curve25519PublicKey,
message_index: u32,
olm_wedging_index: u32,
) -> Self {
ShareInfo::Shared(SharedWith { sender_key, message_index, olm_wedging_index })
}

/// Helper to create a Withheld info
Expand All @@ -184,6 +188,9 @@ pub struct SharedWith {
pub sender_key: Curve25519PublicKey,
/// The message index that the device received.
pub message_index: u32,
/// The Olm wedging index of the device at the time the session was shared.
#[serde(default)]
pub olm_wedging_index: u32,
}

impl OutboundGroupSession {
Expand Down Expand Up @@ -526,7 +533,7 @@ impl OutboundGroupSession {
d.get(device.device_id()).map(|s| match s {
ShareInfo::Shared(s) => {
if device.curve25519_key() == Some(s.sender_key) {
ShareState::Shared(s.message_index)
ShareState::Shared(s.message_index, s.olm_wedging_index)
} else {
ShareState::SharedButChangedSenderKey
}
Expand All @@ -550,7 +557,7 @@ impl OutboundGroupSession {
Some(match info {
ShareInfo::Shared(info) => {
if device.curve25519_key() == Some(info.sender_key) {
ShareState::Shared(info.message_index)
ShareState::Shared(info.message_index, info.olm_wedging_index)
} else {
ShareState::SharedButChangedSenderKey
}
Expand Down Expand Up @@ -602,7 +609,7 @@ impl OutboundGroupSession {
.unwrap()
.entry(user_id.to_owned())
.or_default()
.insert(device_id.to_owned(), ShareInfo::new_shared(sender_key, index));
.insert(device_id.to_owned(), ShareInfo::new_shared(sender_key, index, 0));
}

/// Mark the session as shared with the given user/device pair, starting
Expand All @@ -614,7 +621,7 @@ impl OutboundGroupSession {
device_id: &DeviceId,
sender_key: Curve25519PublicKey,
) {
let share_info = ShareInfo::new_shared(sender_key, self.message_index().await);
let share_info = ShareInfo::new_shared(sender_key, self.message_index().await, 0);
self.shared_with_set
.write()
.unwrap()
Expand Down
199 changes: 191 additions & 8 deletions crates/matrix-sdk-crypto/src/session_manager/group_sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,8 +797,19 @@ impl GroupSessionManager {
let devices: Vec<_> = devices
.into_iter()
.flat_map(|(_, d)| {
d.into_iter()
.filter(|d| matches!(outbound.is_shared_with(d), ShareState::NotShared))
d.into_iter().filter(|d| match outbound.is_shared_with(d) {
ShareState::NotShared => true,
ShareState::Shared(_msg_index, olm_wedging_index) => {
// If the recipient device's Olm wedging index is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be wrapped at a really small width, we wrap at 80 chars, cargo fmt will do this automatically for you if the lines are too long.

// higher than the value that we stored with the
// session, that means that they tried to unwedge the
// session since we last shared the room key. So we
// re-share it with them in case they weren't able to
// decrypt the room key the last time we shared it.
olm_wedging_index < d.olm_wedging_index
}
_ => false,
})
})
.collect();

Expand Down Expand Up @@ -851,13 +862,19 @@ impl GroupSessionManager {

#[cfg(test)]
mod tests {
use std::{collections::BTreeSet, iter, ops::Deref, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
iter,
ops::Deref,
sync::Arc,
};

use assert_matches2::assert_let;
use matrix_sdk_test::{async_test, response_from_file};
use ruma::{
api::{
client::{
keys::{claim_keys, get_keys},
keys::{claim_keys, get_keys, upload_keys},
to_device::send_event_to_device::v3::Response as ToDeviceResponse,
},
IncomingResponse,
Expand All @@ -866,17 +883,23 @@ mod tests {
events::room::history_visibility::HistoryVisibility,
room_id,
to_device::DeviceIdOrAllDevices,
user_id, DeviceId, TransactionId, UserId,
user_id, DeviceId, DeviceKeyAlgorithm, TransactionId, UInt, UserId,
};
use serde_json::{json, Value};

use crate::{
identities::ReadOnlyDevice,
machine::EncryptionSyncChanges,
olm::Account,
session_manager::group_sessions::CollectRecipientsResult,
types::{
events::room_key_withheld::{
RoomKeyWithheldContent, RoomKeyWithheldContent::MegolmV1AesSha2, WithheldCode,
events::{
room::encrypted::EncryptedToDeviceEvent,
room_key_withheld::{
RoomKeyWithheldContent, RoomKeyWithheldContent::MegolmV1AesSha2, WithheldCode,
},
},
EventEncryptionAlgorithm,
DeviceKeys, EventEncryptionAlgorithm,
},
EncryptionSettings, LocalTrust, OlmMachine, ToDeviceRequest,
};
Expand Down Expand Up @@ -1407,4 +1430,164 @@ mod tests {

assert!(device.was_withheld_code_sent());
}

#[async_test]
async fn test_resend_session_after_unwedging() {
let machine = OlmMachine::new(alice_id(), alice_device_id()).await;
assert_let!(Ok(Some((txn_id, device_keys_request))) = machine.upload_device_keys().await);
let device_keys_response = upload_keys::v3::Response::new(BTreeMap::from([(
DeviceKeyAlgorithm::SignedCurve25519,
UInt::new(device_keys_request.one_time_keys.len() as u64).unwrap(),
)]));
machine.mark_request_as_sent(&txn_id, &device_keys_response).await.unwrap();

let room_id = room_id!("!test:localhost");

let bob_id = user_id!("@bob:localhost");
let bob_account = Account::new(bob_id);
let keys_query_data = json!({
"device_keys": {
"@bob:localhost": {
bob_account.device_id.clone(): bob_account.device_keys()
}
}
});
let keys_query =
get_keys::v3::Response::try_from_http_response(response_from_file(&keys_query_data))
.unwrap();
let txn_id = TransactionId::new();
machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();

let alice_device_keys =
device_keys_request.device_keys.unwrap().deserialize_as::<DeviceKeys>().unwrap();
let mut alice_otks = device_keys_request.one_time_keys.iter();
let alice_device = ReadOnlyDevice::new(alice_device_keys, LocalTrust::Unset);

{
// Bob creates an Olm session with Alice and encrypts a message to her
let (alice_otk_id, alice_otk) = alice_otks.next().unwrap();
let mut session = bob_account
.create_outbound_session(
&alice_device,
&BTreeMap::from([(alice_otk_id.clone(), alice_otk.clone())]),
)
.unwrap();
let content = session.encrypt(&alice_device, "m.dummy", json!({}), None).await.unwrap();

let to_device =
EncryptedToDeviceEvent::new(bob_id.to_owned(), content.deserialize().unwrap());

// Alice decrypts the message
let sync_changes = EncryptionSyncChanges {
to_device_events: vec![crate::utilities::json_convert(&to_device).unwrap()],
changed_devices: &Default::default(),
one_time_keys_counts: &Default::default(),
unused_fallback_keys: None,
next_batch_token: None,
};
let (decrypted, _) = machine.receive_sync_changes(sync_changes).await.unwrap();

assert_eq!(1, decrypted.len());
}

// Alice shares the room key with Bob
{
let requests = machine
.share_room_key(room_id, [bob_id].into_iter(), EncryptionSettings::default())
.await
.unwrap();

// We should have had one to-device event
let event_count: usize = requests
.iter()
.filter(|r| r.event_type == "m.room.encrypted".into())
.map(|r| r.message_count())
.sum();
assert_eq!(event_count, 1);

let response = ToDeviceResponse::new();
for request in requests {
machine.mark_request_as_sent(&request.txn_id, &response).await.unwrap();
}
}

// When Alice shares the room key again, there shouldn't be any
// to-device events, since we already shared with Bob
{
let requests = machine
.share_room_key(room_id, [bob_id].into_iter(), EncryptionSettings::default())
.await
.unwrap();

let event_count: usize = requests
.iter()
.filter(|r| r.event_type == "m.room.encrypted".into())
.map(|r| r.message_count())
.sum();
assert_eq!(event_count, 0);
}

// Pretend that Bob wasn't able to decrypt, so he tries to unwedge
{
let (alice_otk_id, alice_otk) = alice_otks.next().unwrap();
let mut session = bob_account
.create_outbound_session(
&alice_device,
&BTreeMap::from([(alice_otk_id.clone(), alice_otk.clone())]),
)
.unwrap();
let content = session.encrypt(&alice_device, "m.dummy", json!({}), None).await.unwrap();

let to_device =
EncryptedToDeviceEvent::new(bob_id.to_owned(), content.deserialize().unwrap());

// Alice decrypts the unwedge message
let sync_changes = EncryptionSyncChanges {
to_device_events: vec![crate::utilities::json_convert(&to_device).unwrap()],
changed_devices: &Default::default(),
one_time_keys_counts: &Default::default(),
unused_fallback_keys: None,
next_batch_token: None,
};
let (decrypted, _) = machine.receive_sync_changes(sync_changes).await.unwrap();

assert_eq!(1, decrypted.len());
}

// When Alice shares the room key again, it should be re-shared with Bob
{
let requests = machine
.share_room_key(room_id, [bob_id].into_iter(), EncryptionSettings::default())
.await
.unwrap();

let event_count: usize = requests
.iter()
.filter(|r| r.event_type == "m.room.encrypted".into())
.map(|r| r.message_count())
.sum();
assert_eq!(event_count, 1);

let response = ToDeviceResponse::new();
for request in requests {
machine.mark_request_as_sent(&request.txn_id, &response).await.unwrap();
}
}

// When Alice shares the room key yet again, there shouldn't be any
// to-device events
{
let requests = machine
.share_room_key(room_id, [bob_id].into_iter(), EncryptionSettings::default())
.await
.unwrap();

let event_count: usize = requests
.iter()
.filter(|r| r.event_type == "m.room.encrypted".into())
.map(|r| r.message_count())
.sum();
assert_eq!(event_count, 0);
}
}
}
Loading