From aa492beb49300b2f9f65651d6c76847ae74a1378 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Mon, 2 Dec 2024 16:33:23 -0500 Subject: [PATCH] speed up sync init --- bindings_ffi/src/mls.rs | 19 ++-- bindings_node/src/signatures.rs | 2 +- bindings_wasm/src/signatures.rs | 2 +- examples/cli/cli-client.rs | 4 +- examples/cli/debug.rs | 2 +- xmtp_mls/Cargo.toml | 5 +- xmtp_mls/benches/sync.rs | 7 +- xmtp_mls/src/api/mls.rs | 6 +- xmtp_mls/src/builder.rs | 6 +- xmtp_mls/src/client.rs | 56 ++++++----- xmtp_mls/src/groups/device_sync.rs | 94 ++++++++++++++----- .../src/groups/device_sync/consent_sync.rs | 27 ++++-- .../src/groups/device_sync/message_sync.rs | 16 +++- xmtp_mls/src/groups/mls_sync.rs | 28 ++++-- xmtp_mls/src/groups/mod.rs | 6 +- xmtp_mls/src/groups/scoped_client.rs | 8 ++ xmtp_mls/src/identity_updates.rs | 10 +- xmtp_mls/src/intents.rs | 2 +- xmtp_mls/src/lib.rs | 45 +++++++-- .../storage/encrypted_store/refresh_state.rs | 2 +- xmtp_mls/src/subscriptions.rs | 2 +- xmtp_mls/src/utils/test/mod.rs | 68 ++++++++++++-- 22 files changed, 302 insertions(+), 115 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 1dc95f2a7..93f534aea 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -285,7 +285,7 @@ impl FfiXmtpClient { } pub fn installation_id(&self) -> Vec { - self.inner_client.installation_public_key() + self.inner_client.installation_public_key().to_vec() } pub fn release_db_connection(&self) -> Result<(), GenericError> { @@ -380,7 +380,7 @@ impl FfiXmtpClient { signature_bytes: Vec, ) -> Result<(), GenericError> { let inner = self.inner_client.as_ref(); - let public_key = inner.installation_public_key(); + let public_key = inner.installation_public_key().to_vec(); self.verify_signed_with_public_key(signature_text, signature_bytes, public_key) } @@ -454,12 +454,8 @@ impl FfiXmtpClient { return Ok(()); } - let provider = self - .inner_client - .mls_provider() - .map_err(GenericError::from_error)?; self.inner_client - .start_sync_worker(&provider) + .start_sync_worker() .await .map_err(GenericError::from_error)?; @@ -537,7 +533,7 @@ impl FfiXmtpClient { let other_installation_ids = inbox_state .installation_ids() .into_iter() - .filter(|id| id != &installation_id) + .filter(|id| id != installation_id) .collect(); let signature_request = self @@ -911,7 +907,8 @@ impl FfiConversations { pub fn get_sync_group(&self) -> Result { let inner = self.inner_client.as_ref(); - let sync_group = inner.get_sync_group()?; + let conn = inner.store().conn()?; + let sync_group = inner.get_sync_group(&conn)?; Ok(sync_group.into()) } @@ -2096,7 +2093,7 @@ mod tests { .unwrap(); register_client(&ffi_inbox_owner, &client_a).await; - let installation_pub_key = client_a.inner_client.installation_public_key(); + let installation_pub_key = client_a.inner_client.installation_public_key().to_vec(); drop(client_a); let client_b = create_client( @@ -2114,7 +2111,7 @@ mod tests { .await .unwrap(); - let other_installation_pub_key = client_b.inner_client.installation_public_key(); + let other_installation_pub_key = client_b.inner_client.installation_public_key().to_vec(); drop(client_b); assert!( diff --git a/bindings_node/src/signatures.rs b/bindings_node/src/signatures.rs index d32ae8e5f..246ae5325 100644 --- a/bindings_node/src/signatures.rs +++ b/bindings_node/src/signatures.rs @@ -97,7 +97,7 @@ impl Client { let other_installation_ids = inbox_state .installation_ids() .into_iter() - .filter(|id| id != &installation_id) + .filter(|id| id != installation_id) .collect(); let signature_request = self .inner_client() diff --git a/bindings_wasm/src/signatures.rs b/bindings_wasm/src/signatures.rs index 8265c88d0..9a660b947 100644 --- a/bindings_wasm/src/signatures.rs +++ b/bindings_wasm/src/signatures.rs @@ -101,7 +101,7 @@ impl Client { let other_installation_ids = inbox_state .installation_ids() .into_iter() - .filter(|id| id != &installation_id) + .filter(|id| id != installation_id) .collect(); let signature_request = self .inner_client() diff --git a/examples/cli/cli-client.rs b/examples/cli/cli-client.rs index 709c7a8a6..df7fa4c42 100755 --- a/examples/cli/cli-client.rs +++ b/examples/cli/cli-client.rs @@ -443,7 +443,7 @@ async fn main() -> color_eyre::eyre::Result<()> { let conn = client.store().conn().unwrap(); let provider = client.mls_provider().unwrap(); client.sync_welcomes(&conn).await.unwrap(); - client.start_sync_worker(&provider).await.unwrap(); + client.start_sync_worker().await.unwrap(); client .send_sync_request(&provider, DeviceSyncKind::MessageHistory) .await @@ -453,7 +453,7 @@ async fn main() -> color_eyre::eyre::Result<()> { Commands::ListHistorySyncMessages {} => { let conn = client.store().conn()?; client.sync_welcomes(&conn).await?; - let group = client.get_sync_group()?; + let group = client.get_sync_group(&conn)?; let group_id_str = hex::encode(group.group_id.clone()); group.sync().await?; let messages = group diff --git a/examples/cli/debug.rs b/examples/cli/debug.rs index 80311e7ff..c177911dc 100644 --- a/examples/cli/debug.rs +++ b/examples/cli/debug.rs @@ -71,7 +71,7 @@ pub async fn debug_welcome_messages( ) -> Result<(), String> { let api_client = client.api(); let envelopes = api_client - .query_welcome_messages(installation_id, None) + .query_welcome_messages(&installation_id, None) .await .unwrap(); for envelope in envelopes { diff --git a/xmtp_mls/Cargo.toml b/xmtp_mls/Cargo.toml index 11f9582b0..e900fcf95 100644 --- a/xmtp_mls/Cargo.toml +++ b/xmtp_mls/Cargo.toml @@ -95,6 +95,8 @@ tracing-subscriber = { workspace = true, features = [ "env-filter", "fmt", "ansi", + "json", + "registry" ], optional = true } @@ -151,6 +153,7 @@ tracing-subscriber = { workspace = true, features = [ "env-filter", "fmt", "ansi", + "json", ] } xmtp_api_grpc = { path = "../xmtp_api_grpc", features = ["test-utils"] } xmtp_api_http = { path = "../xmtp_api_http", features = ["test-utils"] } @@ -163,7 +166,7 @@ diesel-wasm-sqlite = { workspace = true, features = [ ] } ethers = { workspace = true, features = ["rustls"] } openmls = { workspace = true, features = ["js"] } -tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } tracing-wasm = { version = "0.2" } wasm-bindgen-test.workspace = true xmtp_api_http = { path = "../xmtp_api_http", features = ["test-utils"] } diff --git a/xmtp_mls/benches/sync.rs b/xmtp_mls/benches/sync.rs index 3dd215f88..970cfaecd 100644 --- a/xmtp_mls/benches/sync.rs +++ b/xmtp_mls/benches/sync.rs @@ -29,14 +29,13 @@ fn start_sync_worker(c: &mut Criterion) { || { bench_async_setup(|| async { let client = clients::new_client(true).await; - let provider = client.mls_provider().unwrap(); // set history sync URL - (client, provider, span.clone()) + (client, span.clone()) }) }, - |(client, provider, span)| async move { + |(client, span)| async move { client - .start_sync_worker(&provider) + .start_sync_worker() .instrument(span) .await .unwrap() diff --git a/xmtp_mls/src/api/mls.rs b/xmtp_mls/src/api/mls.rs index 826f84325..4000467e6 100644 --- a/xmtp_mls/src/api/mls.rs +++ b/xmtp_mls/src/api/mls.rs @@ -117,11 +117,11 @@ where #[tracing::instrument(level = "trace", skip_all)] pub async fn query_welcome_messages( &self, - installation_id: Vec, + installation_id: &[u8], id_cursor: Option, ) -> Result, ApiError> { tracing::debug!( - installation_id = hex::encode(&installation_id), + installation_id = hex::encode(installation_id), cursor = id_cursor, inbox_id = self.inbox_id, "query welcomes" @@ -135,7 +135,7 @@ where (async { self.api_client .query_welcome_messages(QueryWelcomeMessagesRequest { - installation_key: installation_id.clone(), + installation_key: installation_id.to_vec(), paging_info: Some(PagingInfo { id_cursor: id_cursor.unwrap_or(0), limit: page_size, diff --git a/xmtp_mls/src/builder.rs b/xmtp_mls/src/builder.rs index e8303e1ec..450b84666 100644 --- a/xmtp_mls/src/builder.rs +++ b/xmtp_mls/src/builder.rs @@ -709,7 +709,7 @@ pub(crate) mod tests { register_client(&client_a, wallet).await; assert!(client_a.identity().is_ready()); - let keybytes_a = client_a.installation_public_key(); + let keybytes_a = client_a.installation_public_key().to_vec(); drop(client_a); // Reload the existing store and wallet @@ -729,7 +729,7 @@ pub(crate) mod tests { .build_with_verifier() .await .unwrap(); - let keybytes_b = client_b.installation_public_key(); + let keybytes_b = client_b.installation_public_key().to_vec(); drop(client_b); // Ensure the persistence was used to store the generated keys @@ -762,7 +762,7 @@ pub(crate) mod tests { .build_with_verifier() .await .unwrap(); - assert_eq!(client_d.installation_public_key(), keybytes_a); + assert_eq!(client_d.installation_public_key().to_vec(), keybytes_a); } /// anvil cannot be used in WebAssembly diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 055252832..8efd0da2d 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -172,8 +172,8 @@ pub struct XmtpMlsLocalContext { impl XmtpMlsLocalContext { /// The installation public key is the primary identifier for an installation - pub fn installation_public_key(&self) -> Vec { - self.identity.installation_keys.public_slice().to_vec() + pub fn installation_public_key(&self) -> &[u8; 32] { + self.identity.installation_keys.public_bytes() } /// Get the account address of the blockchain account associated with this client @@ -259,7 +259,7 @@ where V: SmartContractSignatureVerifier, { /// Retrieves the client's installation public key, sometimes also called `installation_id` - pub fn installation_public_key(&self) -> Vec { + pub fn installation_public_key(&self) -> &[u8; 32] { self.context.installation_public_key() } /// Retrieves the client's inbox ID @@ -560,20 +560,25 @@ where Ok(group) } - pub(crate) fn create_sync_group(&self) -> Result, ClientError> { + pub(crate) fn create_sync_group( + &self, + provider: &XmtpOpenMlsProvider, + ) -> Result, ClientError> { tracing::info!("creating sync group"); - let sync_group = MlsGroup::create_and_insert_sync_group(Arc::new(self.clone()))?; + let sync_group = MlsGroup::create_and_insert_sync_group(Arc::new(self.clone()), provider)?; Ok(sync_group) } - /** - * Look up a group by its ID - * - * Returns a [`MlsGroup`] if the group exists, or an error if it does not - */ - pub fn group(&self, group_id: Vec) -> Result, ClientError> { - let conn = &mut self.store().conn()?; + /// Look up a group by its ID + /// + /// Returns a [`MlsGroup`] if the group exists, or an error if it does not + /// + pub fn group_with_conn( + &self, + conn: &DbConnection, + group_id: Vec, + ) -> Result, ClientError> { let stored_group: Option = conn.fetch(&group_id)?; match stored_group { Some(group) => Ok(MlsGroup::new(self.clone(), group.id, group.created_at_ns)), @@ -584,6 +589,15 @@ where } } + /// Look up a group by its ID + /// + /// Returns a [`MlsGroup`] if the group exists, or an error if it does not + /// + pub fn group(&self, group_id: Vec) -> Result, ClientError> { + let conn = &mut self.store().conn()?; + self.group_with_conn(conn, group_id) + } + /** * Look up a DM group by the target's inbox_id. * @@ -697,7 +711,7 @@ where conn: &DbConnection, ) -> Result, ClientError> { let installation_id = self.installation_public_key(); - let id_cursor = conn.get_last_cursor_for_id(&installation_id, EntityKind::Welcome)?; + let id_cursor = conn.get_last_cursor_for_id(installation_id, EntityKind::Welcome)?; let welcomes = self .api_client @@ -747,7 +761,7 @@ where (async { let welcome_v1 = &welcome_v1; self.intents.process_for_id( - &id, + id, EntityKind::Welcome, welcome_v1.id, |provider| async move { @@ -1021,7 +1035,7 @@ pub(crate) mod tests { // Get original KeyPackage. let kp1 = client - .get_key_packages_for_installation_ids(vec![client.installation_public_key()]) + .get_key_packages_for_installation_ids(vec![client.installation_public_key().to_vec()]) .await .unwrap(); assert_eq!(kp1.len(), 1); @@ -1031,7 +1045,7 @@ pub(crate) mod tests { client.rotate_key_package().await.unwrap(); let kp2 = client - .get_key_packages_for_installation_ids(vec![client.installation_public_key()]) + .get_key_packages_for_installation_ids(vec![client.installation_public_key().to_vec()]) .await .unwrap(); assert_eq!(kp2.len(), 1); @@ -1299,9 +1313,9 @@ pub(crate) mod tests { let bo_store = bo.store(); let alix_original_init_key = - get_key_package_init_key(&alix, &alix.installation_public_key()).await; + get_key_package_init_key(&alix, alix.installation_public_key()).await; let bo_original_init_key = - get_key_package_init_key(&bo, &bo.installation_public_key()).await; + get_key_package_init_key(&bo, bo.installation_public_key()).await; // Bo's original key should be deleted let bo_original_from_db = bo_store @@ -1320,19 +1334,19 @@ pub(crate) mod tests { bo.sync_welcomes(&bo.store().conn().unwrap()).await.unwrap(); - let bo_new_key = get_key_package_init_key(&bo, &bo.installation_public_key()).await; + let bo_new_key = get_key_package_init_key(&bo, bo.installation_public_key()).await; // Bo's key should have changed assert_ne!(bo_original_init_key, bo_new_key); bo.sync_welcomes(&bo.store().conn().unwrap()).await.unwrap(); - let bo_new_key_2 = get_key_package_init_key(&bo, &bo.installation_public_key()).await; + let bo_new_key_2 = get_key_package_init_key(&bo, bo.installation_public_key()).await; // Bo's key should not have changed syncing the second time. assert_eq!(bo_new_key, bo_new_key_2); alix.sync_welcomes(&alix.store().conn().unwrap()) .await .unwrap(); - let alix_key_2 = get_key_package_init_key(&alix, &alix.installation_public_key()).await; + let alix_key_2 = get_key_package_init_key(&alix, alix.installation_public_key()).await; // Alix's key should not have changed at all assert_eq!(alix_original_init_key, alix_key_2); diff --git a/xmtp_mls/src/groups/device_sync.rs b/xmtp_mls/src/groups/device_sync.rs index d9c442daf..8f887f81c 100644 --- a/xmtp_mls/src/groups/device_sync.rs +++ b/xmtp_mls/src/groups/device_sync.rs @@ -118,25 +118,43 @@ where ApiClient: XmtpApi + Send + Sync + 'static, V: SmartContractSignatureVerifier + Send + Sync + 'static, { + // TODO: Should we ensure that only one sync worker is running at a time? #[instrument(level = "trace", skip_all)] - pub async fn start_sync_worker( - &self, - provider: &XmtpOpenMlsProvider, - ) -> Result<(), DeviceSyncError> { - self.sync_init(provider).await?; - + pub async fn start_sync_worker(&self) -> Result<(), DeviceSyncError> { crate::spawn(None, { let client = self.clone(); - + tracing::debug!( + inbox_id = client.inbox_id(), + installation_id = hex::encode(client.installation_public_key()), + "starting sync worker" + ); let receiver = client.local_events.subscribe(); let sync_stream = receiver.stream_sync_messages(); async move { pin_mut!(sync_stream); + let inbox_id = client.inbox_id(); + let installation_id = hex::encode(client.installation_public_key()); + // scope ensures the provider is dropped once init is finished, and not + // held for entirety of sync. + let res = async { + let provider = client.mls_provider()?; + client.sync_init(&provider).await?; + Ok::<_, DeviceSyncError>(()) + }; + + if let Err(e) = res.await { + tracing::error!( + inbox_id, + installation_id, + "sync worker failed to init error = {e}" + ); + } while let Err(err) = client.sync_worker(&mut sync_stream).await { - tracing::error!("Sync worker error: {err}"); + tracing::error!(inbox_id, installation_id, "Sync worker error: {err}"); } + Ok::<_, DeviceSyncError>(()) } }); @@ -184,7 +202,11 @@ where }; if let Err(err) = self.process_sync_reply(&provider, reply).await { - tracing::warn!("Sync worker error: {err}"); + tracing::warn!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Sync worker error: {err}" + ); } } SyncMessage::Request { message_id } => { @@ -207,7 +229,11 @@ where }; if let Err(err) = self.reply_to_sync_request(&provider, request).await { - tracing::warn!("Sync worker error: {err}"); + tracing::warn!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Sync worker error: {err}" + ); } } }, @@ -235,10 +261,12 @@ where #[instrument(level = "trace", skip_all)] pub async fn sync_init(&self, provider: &XmtpOpenMlsProvider) -> Result<(), DeviceSyncError> { tracing::info!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), "Initializing device sync... url: {:?}", self.history_sync_url ); - if self.get_sync_group().is_err() { + if self.get_sync_group(provider.conn_ref()).is_err() { self.ensure_sync_group(provider).await?; self.send_sync_request(provider, DeviceSyncKind::Consent) @@ -246,7 +274,11 @@ where self.send_sync_request(provider, DeviceSyncKind::MessageHistory) .await?; } - tracing::info!("Device sync initialized."); + tracing::info!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Device sync initialized." + ); Ok(()) } @@ -256,9 +288,9 @@ where &self, provider: &XmtpOpenMlsProvider, ) -> Result, GroupError> { - let sync_group = match self.get_sync_group() { + let sync_group = match self.get_sync_group(provider.conn_ref()) { Ok(group) => group, - Err(_) => self.create_sync_group()?, + Err(_) => self.create_sync_group(provider)?, }; sync_group .maybe_update_installations(provider, None) @@ -274,11 +306,15 @@ where provider: &XmtpOpenMlsProvider, kind: DeviceSyncKind, ) -> Result { - tracing::info!("Sending a sync request for {kind:?}"); + tracing::info!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Sending a sync request for {kind:?}" + ); let request = DeviceSyncRequest::new(kind); // find the sync group - let sync_group = self.get_sync_group()?; + let sync_group = self.get_sync_group(provider.conn_ref())?; // sync the group sync_group.sync_with_conn(provider).await?; @@ -306,7 +342,12 @@ where // publish the intent if let Err(err) = sync_group.publish_intents(provider).await { - tracing::error!("error publishing sync group intents: {:?}", err); + tracing::error!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "error publishing sync group intents: {:?}", + err + ); } Ok(request) @@ -342,7 +383,7 @@ where ) -> Result<(), DeviceSyncError> { let conn = provider.conn_ref(); // find the sync group - let sync_group = self.get_sync_group()?; + let sync_group = self.get_sync_group(provider.conn_ref())?; // sync the group sync_group.sync_with_conn(provider).await?; @@ -383,7 +424,7 @@ where provider: &XmtpOpenMlsProvider, kind: DeviceSyncKind, ) -> Result<(StoredGroupMessage, DeviceSyncRequestProto), DeviceSyncError> { - let sync_group = self.get_sync_group()?; + let sync_group = self.get_sync_group(provider.conn_ref())?; sync_group.sync_with_conn(provider).await?; let messages = sync_group @@ -416,7 +457,7 @@ where provider: &XmtpOpenMlsProvider, kind: DeviceSyncKind, ) -> Result, DeviceSyncError> { - let sync_group = self.get_sync_group()?; + let sync_group = self.get_sync_group(provider.conn_ref())?; sync_group.sync_with_conn(provider).await?; let messages = sync_group @@ -500,7 +541,11 @@ where return Err(DeviceSyncError::MissingHistorySyncUrl); }; let upload_url = format!("{url}/upload"); - tracing::info!("Using upload url {upload_url}"); + tracing::info!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Using upload url {upload_url}" + ); let response = reqwest::Client::new() .post(upload_url) @@ -510,6 +555,8 @@ where if !response.status().is_success() { tracing::error!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), "Failed to upload file. Status code: {} Response: {response:?}", response.status() ); @@ -590,13 +637,12 @@ where } #[instrument(level = "trace", skip_all)] - pub fn get_sync_group(&self) -> Result, GroupError> { - let conn = self.store().conn()?; + pub fn get_sync_group(&self, conn: &DbConnection) -> Result, GroupError> { let sync_group_id = conn .latest_sync_group()? .ok_or(GroupError::GroupNotFound)? .id; - let sync_group = self.group(sync_group_id.clone())?; + let sync_group = self.group_with_conn(conn, sync_group_id.clone())?; Ok(sync_group) } diff --git a/xmtp_mls/src/groups/device_sync/consent_sync.rs b/xmtp_mls/src/groups/device_sync/consent_sync.rs index cd23e10b0..82142f863 100644 --- a/xmtp_mls/src/groups/device_sync/consent_sync.rs +++ b/xmtp_mls/src/groups/device_sync/consent_sync.rs @@ -18,7 +18,12 @@ where provider: &XmtpOpenMlsProvider, record: &StoredConsentRecord, ) -> Result<(), DeviceSyncError> { - tracing::info!("Streaming consent update. {:?}", record); + tracing::info!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Streaming consent update. {:?}", + record + ); let conn = provider.conn_ref(); let consent_update_proto = ConsentUpdateProto { @@ -75,6 +80,7 @@ pub(crate) mod tests { builder::ClientBuilder, groups::scoped_client::LocalScopedGroupClient, storage::consent_record::{ConsentState, ConsentType}, + utils::test::wait_for_min_intents, }; use xmtp_cryptography::utils::generate_local_wallet; use xmtp_id::InboxOwner; @@ -106,21 +112,30 @@ pub(crate) mod tests { let amal_b = ClientBuilder::new_test_client_with_history(&wallet, &history_sync_url).await; let amal_b_provider = amal_b.mls_provider().unwrap(); let amal_b_conn = amal_b_provider.conn_ref(); - let consent_records_b = amal_b.syncable_consent_records(amal_b_conn).unwrap(); assert_eq!(consent_records_b.len(), 0); - let old_group_id = amal_a.get_sync_group().unwrap().group_id; + // make sure amal's worker has time to sync + // 3 Intents: + // 1.) UpdateGroupMembership Intent for new sync group + // 2.) Device Sync Request + // 3.) MessageHistory Sync Request + wait_for_min_intents(amal_b_conn, 3).await; + tracing::info!("Waiting for intents published"); + + let old_group_id = amal_a.get_sync_group(amal_a_conn).unwrap().group_id; + tracing::info!("Old Group Id: {}", hex::encode(&old_group_id)); // Check for new welcomes to new groups in the first installation (should be welcomed to a new sync group from amal_b). amal_a.sync_welcomes(amal_a_conn).await.unwrap(); - let new_group_id = amal_a.get_sync_group().unwrap().group_id; + let new_group_id = amal_a.get_sync_group(amal_a_conn).unwrap().group_id; + tracing::info!("New Group Id: {}", hex::encode(&new_group_id)); // group id should have changed to the new sync group created by the second installation assert_ne!(old_group_id, new_group_id); let consent_a = amal_a.syncable_consent_records(amal_a_conn).unwrap().len(); // Have amal_a receive the message (and auto-process) - let amal_a_sync_group = amal_a.get_sync_group().unwrap(); + let amal_a_sync_group = amal_a.get_sync_group(amal_a_conn).unwrap(); assert_ok!(amal_a_sync_group.sync_with_conn(&amal_a_provider).await); // Wait for up to 3 seconds for the reply on amal_b (usually is almost instant) @@ -148,7 +163,7 @@ pub(crate) mod tests { } // Test consent streaming - let amal_b_sync_group = amal_b.get_sync_group().unwrap(); + let amal_b_sync_group = amal_b.get_sync_group(amal_b_conn).unwrap(); let bo_wallet = generate_local_wallet(); // Ensure bo is not consented with amal_b diff --git a/xmtp_mls/src/groups/device_sync/message_sync.rs b/xmtp_mls/src/groups/device_sync/message_sync.rs index 3edab260f..c2e4976a7 100644 --- a/xmtp_mls/src/groups/device_sync/message_sync.rs +++ b/xmtp_mls/src/groups/device_sync/message_sync.rs @@ -50,7 +50,7 @@ pub(crate) mod tests { use super::*; use crate::{ assert_ok, builder::ClientBuilder, groups::GroupMetadataOptions, - utils::test::HISTORY_SYNC_URL, + utils::test::wait_for_min_intents, utils::test::HISTORY_SYNC_URL, }; use std::time::{Duration, Instant}; use xmtp_cryptography::utils::generate_local_wallet; @@ -92,13 +92,21 @@ pub(crate) mod tests { let groups_b = amal_b.syncable_groups(amal_b_conn).unwrap(); assert_eq!(groups_b.len(), 0); - let old_group_id = amal_a.get_sync_group().unwrap().group_id; + // make sure amal's worker has time to sync + // 3 Intents: + // 1.) UpdateGroupMembership Intent for new sync group + // 2.) Device Sync Request + // 3.) MessageHistory Sync Request + wait_for_min_intents(amal_b_conn, 3).await; + tracing::info!("Waiting for intents published"); + + let old_group_id = amal_a.get_sync_group(amal_a_conn).unwrap().group_id; // Check for new welcomes to new groups in the first installation (should be welcomed to a new sync group from amal_b). amal_a .sync_welcomes(amal_a_conn) .await .expect("sync_welcomes"); - let new_group_id = amal_a.get_sync_group().unwrap().group_id; + let new_group_id = amal_a.get_sync_group(amal_a_conn).unwrap().group_id; // group id should have changed to the new sync group created by the second installation assert_ne!(old_group_id, new_group_id); @@ -109,7 +117,7 @@ pub(crate) mod tests { .unwrap(); // Have amal_a receive the message (and auto-process) - let amal_a_sync_group = amal_a.get_sync_group().unwrap(); + let amal_a_sync_group = amal_a.get_sync_group(amal_a_conn).unwrap(); assert_ok!(amal_a_sync_group.sync_with_conn(&amal_a_provider).await); // Wait for up to 3 seconds for the reply on amal_b (usually is almost instant) diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index 778c41c4a..a47ec3f9a 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -58,7 +58,6 @@ use std::{ mem::{discriminant, Discriminant}, }; use thiserror::Error; -use tracing::debug; use xmtp_id::{InboxId, InboxIdRef}; use xmtp_proto::xmtp::mls::{ api::v1::{ @@ -364,6 +363,7 @@ where let group_epoch = openmls_group.epoch(); debug!( inbox_id = self.client.inbox_id(), + installation_id = hex::encode(self.client.installation_id()), group_id = hex::encode(&self.group_id), current_epoch = openmls_group.epoch().as_u64(), msg_id, @@ -973,9 +973,10 @@ where openmls_group.epoch().as_u64() as i64, )?; tracing::debug!( + inbox_id = self.client.inbox_id(), + installation_id = hex::encode(self.client.installation_id()), intent.id, intent.kind = %intent.kind, - inbox_id = self.client.inbox_id(), group_id = hex::encode(&self.group_id), "client [{}] set stored intent [{}] to state `published`", self.client.inbox_id(), @@ -991,6 +992,7 @@ where intent.id, intent.kind = %intent.kind, inbox_id = self.client.inbox_id(), + installation_id = hex::encode(self.client.installation_id()), group_id = hex::encode(&self.group_id), "[{}] published intent [{}] of type [{}]", self.client.inbox_id(), @@ -1003,7 +1005,11 @@ where } } Ok(None) => { - tracing::info!("Skipping intent because no publish data returned"); + tracing::info!( + inbox_id = self.client.inbox_id(), + installation_id = hex::encode(self.client.installation_id()), + "Skipping intent because no publish data returned" + ); let deleter: &dyn Delete = provider.conn_ref(); deleter.delete(intent.id)?; } @@ -1140,7 +1146,12 @@ where for intent in intents { if let Some(post_commit_data) = intent.post_commit_data { - tracing::debug!(intent.id, intent.kind = %intent.kind, "taking post commit action"); + tracing::debug!( + inbox_id = self.client.inbox_id(), + installation_id = hex::encode(self.client.installation_id()), + intent.id, + intent.kind = %intent.kind, "taking post commit action" + ); let post_commit_action = PostCommitAction::from_bytes(post_commit_data.as_slice())?; match post_commit_action { @@ -1203,7 +1214,12 @@ where return Ok(()); } - debug!("Adding missing installations {:?}", intent_data); + debug!( + inbox_id = self.client.inbox_id(), + installation_id = hex::encode(self.client.installation_id()), + "Adding missing installations {:?}", + intent_data + ); let intent = self.queue_intent_with_conn( provider.conn_ref(), @@ -1392,7 +1408,7 @@ async fn apply_update_group_membership_intent( let mut new_key_packages: Vec = vec![]; if !installation_diff.added_installations.is_empty() { - let my_installation_id = &client.context().installation_public_key(); + let my_installation_id = &client.context().installation_public_key().to_vec(); // Go to the network and load the key packages for any new installation let key_packages = client .get_key_packages_for_installation_ids( diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index bd0b3807b..8060040df 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -546,10 +546,10 @@ impl MlsGroup { pub(crate) fn create_and_insert_sync_group( client: Arc, + provider: &XmtpOpenMlsProvider, ) -> Result, GroupError> { let context = client.context(); let creator_inbox_id = context.inbox_id(); - let provider = client.mls_provider()?; let protected_metadata = build_protected_metadata_extension(creator_inbox_id, ConversationType::Sync)?; @@ -687,7 +687,7 @@ impl MlsGroup { decrypted_message_bytes: message.to_vec(), sent_at_ns: now, kind: GroupMessageKind::Application, - sender_installation_id: self.context().installation_public_key(), + sender_installation_id: self.context().installation_public_key().into(), sender_inbox_id: self.context().inbox_id().to_string(), delivery_status: DeliveryStatus::Unpublished, }; @@ -1668,7 +1668,7 @@ pub(crate) mod tests { let serialized_welcome = welcome.tls_serialize_detached().unwrap(); let send_welcomes_action = SendWelcomesAction::new( vec![Installation { - installation_key: new_member_client.installation_public_key(), + installation_key: new_member_client.installation_public_key().into(), hpke_public_key: hpke_init_key, }], serialized_welcome, diff --git a/xmtp_mls/src/groups/scoped_client.rs b/xmtp_mls/src/groups/scoped_client.rs index 75700d432..808fee0e0 100644 --- a/xmtp_mls/src/groups/scoped_client.rs +++ b/xmtp_mls/src/groups/scoped_client.rs @@ -37,6 +37,10 @@ pub trait LocalScopedGroupClient: Send + Sync + Sized { self.context_ref().inbox_id() } + fn installation_id(&self) -> &[u8] { + self.context_ref().installation_public_key() + } + fn mls_provider(&self) -> Result { self.context_ref().mls_provider() } @@ -101,6 +105,10 @@ pub trait ScopedGroupClient: Sized { self.context_ref().inbox_id() } + fn installation_id(&self) -> &[u8] { + self.context_ref().installation_public_key() + } + fn mls_provider(&self) -> Result { self.context_ref().mls_provider() } diff --git a/xmtp_mls/src/identity_updates.rs b/xmtp_mls/src/identity_updates.rs index 7c7220aa9..e4a6cef10 100644 --- a/xmtp_mls/src/identity_updates.rs +++ b/xmtp_mls/src/identity_updates.rs @@ -812,8 +812,8 @@ pub(crate) mod tests { let client_2 = ClientBuilder::new_test_client(&wallet_2).await; let client_3 = ClientBuilder::new_test_client(&wallet_3).await; - let client_2_installation_key = client_2.installation_public_key(); - let client_3_installation_key = client_3.installation_public_key(); + let client_2_installation_key = client_2.installation_public_key().to_vec(); + let client_3_installation_key = client_3.installation_public_key().to_vec(); let mut inbox_ids: Vec = vec![]; @@ -902,11 +902,11 @@ pub(crate) mod tests { assert_eq!(installation_diff.added_installations.len(), 1); assert!(installation_diff .added_installations - .contains(&client_3_installation_key),); + .contains(&client_3_installation_key.to_vec()),); assert_eq!(installation_diff.removed_installations.len(), 1); assert!(installation_diff .removed_installations - .contains(&client_2_installation_key)); + .contains(&client_2_installation_key.to_vec())); } #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] @@ -979,7 +979,7 @@ pub(crate) mod tests { // Now revoke the second client let mut revoke_installation_request = client1 - .revoke_installations(vec![client2.installation_public_key()]) + .revoke_installations(vec![client2.installation_public_key().to_vec()]) .await .unwrap(); add_wallet_signature(&mut revoke_installation_request, &wallet).await; diff --git a/xmtp_mls/src/intents.rs b/xmtp_mls/src/intents.rs index b7b06ef06..7f54daa6d 100644 --- a/xmtp_mls/src/intents.rs +++ b/xmtp_mls/src/intents.rs @@ -52,7 +52,7 @@ impl Intents { /// apply the update after the provided `ProcessingFn` has completed successfully. pub(crate) async fn process_for_id( &self, - entity_id: &Vec, + entity_id: &[u8], entity_kind: EntityKind, cursor: u64, process_envelope: ProcessingFn, diff --git a/xmtp_mls/src/lib.rs b/xmtp_mls/src/lib.rs index 6384cc5de..fb21ba10b 100644 --- a/xmtp_mls/src/lib.rs +++ b/xmtp_mls/src/lib.rs @@ -28,6 +28,9 @@ pub use xmtp_openmls_provider::XmtpOpenMlsProvider; pub use xmtp_id::InboxOwner; pub use xmtp_proto::api_client::trait_impls::*; +#[macro_use] +extern crate tracing; + /// Global Marker trait for WebAssembly #[cfg(target_arch = "wasm32")] pub trait Wasm {} @@ -119,15 +122,45 @@ pub(crate) mod tests { #[cfg_attr(not(target_arch = "wasm32"), ctor::ctor)] #[cfg(not(target_arch = "wasm32"))] fn _setup() { - use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + use tracing_subscriber::{ + fmt::{self, format}, + layer::SubscriberExt, + util::SubscriberInitExt, + EnvFilter, Layer, + }; - let filter = EnvFilter::builder() - .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) - .from_env_lossy(); + let structured = std::env::var("STRUCTURED"); + let is_structured = matches!(structured, Ok(s) if s == "true" || s == "1"); + + let filter = || { + EnvFilter::builder() + .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) + .from_env_lossy() + }; tracing_subscriber::registry() - .with(fmt::layer().pretty()) - .with(filter) + // structured JSON logger + .with(is_structured.then(|| { + tracing_subscriber::fmt::layer() + .json() + .flatten_event(true) + .with_level(true) + .with_filter(filter()) + })) + // default logger + .with((!is_structured).then(|| { + fmt::layer() + .compact() + .fmt_fields({ + format::debug_fn(move |writer, field, value| { + if field.name() == "message" { + write!(writer, "{:?}", value)?; + } + Ok(()) + }) + }) + .with_filter(filter()) + })) .init(); } diff --git a/xmtp_mls/src/storage/encrypted_store/refresh_state.rs b/xmtp_mls/src/storage/encrypted_store/refresh_state.rs index 19b6b28cc..34efa6220 100644 --- a/xmtp_mls/src/storage/encrypted_store/refresh_state.rs +++ b/xmtp_mls/src/storage/encrypted_store/refresh_state.rs @@ -92,7 +92,7 @@ impl DbConnection { pub fn update_cursor( &self, - entity_id: &Vec, + entity_id: &[u8], entity_kind: EntityKind, cursor: i64, ) -> Result { diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index d5c9dbf6a..67606dae8 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -305,7 +305,7 @@ where tracing::info!(inbox_id = self.inbox_id(), "Setting up conversation stream"); let subscription = self .api_client - .subscribe_welcome_messages(installation_key, Some(id_cursor)) + .subscribe_welcome_messages(installation_key.into(), Some(id_cursor)) .await?; let stream = subscription diff --git a/xmtp_mls/src/utils/test/mod.rs b/xmtp_mls/src/utils/test/mod.rs index 47b56c2bb..aded954b2 100755 --- a/xmtp_mls/src/utils/test/mod.rs +++ b/xmtp_mls/src/utils/test/mod.rs @@ -1,5 +1,13 @@ #![allow(clippy::unwrap_used)] +use crate::storage::DbConnection; +use crate::{ + builder::ClientBuilder, + identity::IdentityStrategy, + storage::{EncryptedMessageStore, StorageOption}, + types::Address, + Client, InboxOwner, XmtpApi, +}; use rand::{ distributions::{Alphanumeric, DistString}, Rng, RngCore, @@ -16,14 +24,6 @@ use xmtp_id::{ }; use xmtp_proto::api_client::XmtpTestClient; -use crate::{ - builder::ClientBuilder, - identity::IdentityStrategy, - storage::{EncryptedMessageStore, StorageOption}, - types::Address, - Client, InboxOwner, XmtpApi, -}; - #[cfg(not(target_arch = "wasm32"))] pub mod traced_test; #[cfg(not(target_arch = "wasm32"))] @@ -109,6 +109,7 @@ impl ClientBuilder { ) } } + impl ClientBuilder { pub async fn new_test_client(owner: &impl InboxOwner) -> FullXmtpClient { let api_client = ::create_local().await; @@ -235,8 +236,7 @@ where register_client(&client, owner).await; if client.history_sync_url.is_some() { - let provider = client.mls_provider().unwrap(); - client.start_sync_worker(&provider).await.unwrap(); + client.start_sync_worker().await.unwrap(); } client @@ -298,3 +298,51 @@ pub async fn register_client( client.register_identity(signature_request).await.unwrap(); } + +/// waits for all intents to finish +/// TODO: Should wrap with a timeout +pub async fn wait_for_all_intents_published(conn: &DbConnection) { + use crate::storage::group_intent::IntentState; + use crate::storage::group_intent::StoredGroupIntent; + use crate::storage::schema::group_intents::dsl; + use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; + + let mut query = dsl::group_intents.into_boxed(); + let states = vec![IntentState::ToPublish]; + query = query.filter(dsl::state.eq_any(states)); + query = query.order(dsl::id.asc()); + + let intents = conn + .raw_query(|conn| query.load::(conn)) + .unwrap(); + + tracing::info!("{} intents left", intents.len()); + if intents.is_empty() { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Box::pin(wait_for_all_intents_published(conn)).await + } +} + +/// wait for a minimum amount of intent +/// TODO: Should wrap with a timeout +pub async fn wait_for_min_intents(conn: &DbConnection, n: usize) { + use crate::storage::group_intent::IntentState; + use crate::storage::group_intent::StoredGroupIntent; + use crate::storage::schema::group_intents::dsl; + use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; + + let mut query = dsl::group_intents.into_boxed(); + let states = vec![IntentState::Published]; + query = query.filter(dsl::state.eq_any(states)); + query = query.order(dsl::id.asc()); + + let intents = conn + .raw_query(|conn| query.load::(conn)) + .unwrap(); + + tracing::info!("{} intents left", intents.len()); + if intents.len() < n { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Box::pin(wait_for_min_intents(conn, n - intents.len())).await + } +}