diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 1dc95f2a7..41fcf1b13 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -454,12 +454,8 @@ impl FfiXmtpClient { return Ok(()); } - let provider = self - .inner_client - .mls_provider() - .map_err(GenericError::from_error)?; self.inner_client - .start_sync_worker(&provider) + .start_sync_worker() .await .map_err(GenericError::from_error)?; @@ -911,7 +907,8 @@ impl FfiConversations { pub fn get_sync_group(&self) -> Result { let inner = self.inner_client.as_ref(); - let sync_group = inner.get_sync_group()?; + let conn = inner.store().conn()?; + let sync_group = inner.get_sync_group(&conn)?; Ok(sync_group.into()) } diff --git a/examples/cli/cli-client.rs b/examples/cli/cli-client.rs index 709c7a8a6..df7fa4c42 100755 --- a/examples/cli/cli-client.rs +++ b/examples/cli/cli-client.rs @@ -443,7 +443,7 @@ async fn main() -> color_eyre::eyre::Result<()> { let conn = client.store().conn().unwrap(); let provider = client.mls_provider().unwrap(); client.sync_welcomes(&conn).await.unwrap(); - client.start_sync_worker(&provider).await.unwrap(); + client.start_sync_worker().await.unwrap(); client .send_sync_request(&provider, DeviceSyncKind::MessageHistory) .await @@ -453,7 +453,7 @@ async fn main() -> color_eyre::eyre::Result<()> { Commands::ListHistorySyncMessages {} => { let conn = client.store().conn()?; client.sync_welcomes(&conn).await?; - let group = client.get_sync_group()?; + let group = client.get_sync_group(&conn)?; let group_id_str = hex::encode(group.group_id.clone()); group.sync().await?; let messages = group diff --git a/xmtp_mls/Cargo.toml b/xmtp_mls/Cargo.toml index 11f9582b0..e900fcf95 100644 --- a/xmtp_mls/Cargo.toml +++ b/xmtp_mls/Cargo.toml @@ -95,6 +95,8 @@ tracing-subscriber = { workspace = true, features = [ "env-filter", "fmt", "ansi", + "json", + "registry" ], optional = true } @@ -151,6 +153,7 @@ tracing-subscriber = { workspace = true, features = [ "env-filter", "fmt", "ansi", + "json", ] } xmtp_api_grpc = { path = "../xmtp_api_grpc", features = ["test-utils"] } xmtp_api_http = { path = "../xmtp_api_http", features = ["test-utils"] } @@ -163,7 +166,7 @@ diesel-wasm-sqlite = { workspace = true, features = [ ] } ethers = { workspace = true, features = ["rustls"] } openmls = { workspace = true, features = ["js"] } -tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } tracing-wasm = { version = "0.2" } wasm-bindgen-test.workspace = true xmtp_api_http = { path = "../xmtp_api_http", features = ["test-utils"] } diff --git a/xmtp_mls/benches/sync.rs b/xmtp_mls/benches/sync.rs index 3dd215f88..970cfaecd 100644 --- a/xmtp_mls/benches/sync.rs +++ b/xmtp_mls/benches/sync.rs @@ -29,14 +29,13 @@ fn start_sync_worker(c: &mut Criterion) { || { bench_async_setup(|| async { let client = clients::new_client(true).await; - let provider = client.mls_provider().unwrap(); // set history sync URL - (client, provider, span.clone()) + (client, span.clone()) }) }, - |(client, provider, span)| async move { + |(client, span)| async move { client - .start_sync_worker(&provider) + .start_sync_worker() .instrument(span) .await .unwrap() diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 055252832..9005ab752 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -173,7 +173,7 @@ pub struct XmtpMlsLocalContext { impl XmtpMlsLocalContext { /// The installation public key is the primary identifier for an installation pub fn installation_public_key(&self) -> Vec { - self.identity.installation_keys.public_slice().to_vec() + self.identity.installation_keys.public_bytes().to_vec() } /// Get the account address of the blockchain account associated with this client @@ -560,20 +560,25 @@ where Ok(group) } - pub(crate) fn create_sync_group(&self) -> Result, ClientError> { + pub(crate) fn create_sync_group( + &self, + provider: &XmtpOpenMlsProvider, + ) -> Result, ClientError> { tracing::info!("creating sync group"); - let sync_group = MlsGroup::create_and_insert_sync_group(Arc::new(self.clone()))?; + let sync_group = MlsGroup::create_and_insert_sync_group(Arc::new(self.clone()), provider)?; Ok(sync_group) } - /** - * Look up a group by its ID - * - * Returns a [`MlsGroup`] if the group exists, or an error if it does not - */ - pub fn group(&self, group_id: Vec) -> Result, ClientError> { - let conn = &mut self.store().conn()?; + /// Look up a group by its ID + /// + /// Returns a [`MlsGroup`] if the group exists, or an error if it does not + /// + pub fn group_with_conn( + &self, + conn: &DbConnection, + group_id: Vec, + ) -> Result, ClientError> { let stored_group: Option = conn.fetch(&group_id)?; match stored_group { Some(group) => Ok(MlsGroup::new(self.clone(), group.id, group.created_at_ns)), @@ -584,6 +589,15 @@ where } } + /// Look up a group by its ID + /// + /// Returns a [`MlsGroup`] if the group exists, or an error if it does not + /// + pub fn group(&self, group_id: Vec) -> Result, ClientError> { + let conn = &mut self.store().conn()?; + self.group_with_conn(conn, group_id) + } + /** * Look up a DM group by the target's inbox_id. * diff --git a/xmtp_mls/src/groups/device_sync.rs b/xmtp_mls/src/groups/device_sync.rs index d9c442daf..0f1a4a4dc 100644 --- a/xmtp_mls/src/groups/device_sync.rs +++ b/xmtp_mls/src/groups/device_sync.rs @@ -118,25 +118,47 @@ where ApiClient: XmtpApi + Send + Sync + 'static, V: SmartContractSignatureVerifier + Send + Sync + 'static, { + // TODO: Should we ensure that only one sync worker is running at a time? #[instrument(level = "trace", skip_all)] - pub async fn start_sync_worker( - &self, - provider: &XmtpOpenMlsProvider, - ) -> Result<(), DeviceSyncError> { - self.sync_init(provider).await?; - + pub async fn start_sync_worker(&self) -> Result<(), DeviceSyncError> { crate::spawn(None, { let client = self.clone(); - + let installation_id = client.installation_public_key(); + tracing::debug!( + inbox_id = client.inbox_id(), + installation_id = hex::encode(&installation_id), + "starting sync worker" + ); let receiver = client.local_events.subscribe(); let sync_stream = receiver.stream_sync_messages(); async move { pin_mut!(sync_stream); + let inbox_id = client.inbox_id(); + // scope ensures the provider is dropped once init is finished, and not + // held for entirety of sync. + let res = async { + let provider = client.mls_provider()?; + client.sync_init(&provider).await?; + Ok::<_, DeviceSyncError>(()) + }; + + if let Err(e) = res.await { + tracing::error!( + inbox_id, + installation_id = hex::encode(&installation_id), + "sync worker failed to init error = {e}" + ); + } while let Err(err) = client.sync_worker(&mut sync_stream).await { - tracing::error!("Sync worker error: {err}"); + tracing::error!( + inbox_id, + installation_id = hex::encode(&installation_id), + "Sync worker error: {err}" + ); } + Ok::<_, DeviceSyncError>(()) } }); @@ -184,7 +206,11 @@ where }; if let Err(err) = self.process_sync_reply(&provider, reply).await { - tracing::warn!("Sync worker error: {err}"); + tracing::warn!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Sync worker error: {err}" + ); } } SyncMessage::Request { message_id } => { @@ -207,7 +233,11 @@ where }; if let Err(err) = self.reply_to_sync_request(&provider, request).await { - tracing::warn!("Sync worker error: {err}"); + tracing::warn!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Sync worker error: {err}" + ); } } }, @@ -235,10 +265,12 @@ where #[instrument(level = "trace", skip_all)] pub async fn sync_init(&self, provider: &XmtpOpenMlsProvider) -> Result<(), DeviceSyncError> { tracing::info!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), "Initializing device sync... url: {:?}", self.history_sync_url ); - if self.get_sync_group().is_err() { + if self.get_sync_group(provider.conn_ref()).is_err() { self.ensure_sync_group(provider).await?; self.send_sync_request(provider, DeviceSyncKind::Consent) @@ -246,7 +278,11 @@ where self.send_sync_request(provider, DeviceSyncKind::MessageHistory) .await?; } - tracing::info!("Device sync initialized."); + tracing::info!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Device sync initialized." + ); Ok(()) } @@ -256,9 +292,9 @@ where &self, provider: &XmtpOpenMlsProvider, ) -> Result, GroupError> { - let sync_group = match self.get_sync_group() { + let sync_group = match self.get_sync_group(provider.conn_ref()) { Ok(group) => group, - Err(_) => self.create_sync_group()?, + Err(_) => self.create_sync_group(provider)?, }; sync_group .maybe_update_installations(provider, None) @@ -274,11 +310,15 @@ where provider: &XmtpOpenMlsProvider, kind: DeviceSyncKind, ) -> Result { - tracing::info!("Sending a sync request for {kind:?}"); + tracing::info!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Sending a sync request for {kind:?}" + ); let request = DeviceSyncRequest::new(kind); // find the sync group - let sync_group = self.get_sync_group()?; + let sync_group = self.get_sync_group(provider.conn_ref())?; // sync the group sync_group.sync_with_conn(provider).await?; @@ -306,7 +346,12 @@ where // publish the intent if let Err(err) = sync_group.publish_intents(provider).await { - tracing::error!("error publishing sync group intents: {:?}", err); + tracing::error!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "error publishing sync group intents: {:?}", + err + ); } Ok(request) @@ -342,7 +387,7 @@ where ) -> Result<(), DeviceSyncError> { let conn = provider.conn_ref(); // find the sync group - let sync_group = self.get_sync_group()?; + let sync_group = self.get_sync_group(provider.conn_ref())?; // sync the group sync_group.sync_with_conn(provider).await?; @@ -383,7 +428,7 @@ where provider: &XmtpOpenMlsProvider, kind: DeviceSyncKind, ) -> Result<(StoredGroupMessage, DeviceSyncRequestProto), DeviceSyncError> { - let sync_group = self.get_sync_group()?; + let sync_group = self.get_sync_group(provider.conn_ref())?; sync_group.sync_with_conn(provider).await?; let messages = sync_group @@ -416,7 +461,7 @@ where provider: &XmtpOpenMlsProvider, kind: DeviceSyncKind, ) -> Result, DeviceSyncError> { - let sync_group = self.get_sync_group()?; + let sync_group = self.get_sync_group(provider.conn_ref())?; sync_group.sync_with_conn(provider).await?; let messages = sync_group @@ -500,7 +545,11 @@ where return Err(DeviceSyncError::MissingHistorySyncUrl); }; let upload_url = format!("{url}/upload"); - tracing::info!("Using upload url {upload_url}"); + tracing::info!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Using upload url {upload_url}" + ); let response = reqwest::Client::new() .post(upload_url) @@ -510,6 +559,8 @@ where if !response.status().is_success() { tracing::error!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), "Failed to upload file. Status code: {} Response: {response:?}", response.status() ); @@ -590,13 +641,12 @@ where } #[instrument(level = "trace", skip_all)] - pub fn get_sync_group(&self) -> Result, GroupError> { - let conn = self.store().conn()?; + pub fn get_sync_group(&self, conn: &DbConnection) -> Result, GroupError> { let sync_group_id = conn .latest_sync_group()? .ok_or(GroupError::GroupNotFound)? .id; - let sync_group = self.group(sync_group_id.clone())?; + let sync_group = self.group_with_conn(conn, sync_group_id.clone())?; Ok(sync_group) } diff --git a/xmtp_mls/src/groups/device_sync/consent_sync.rs b/xmtp_mls/src/groups/device_sync/consent_sync.rs index cd23e10b0..d18f45226 100644 --- a/xmtp_mls/src/groups/device_sync/consent_sync.rs +++ b/xmtp_mls/src/groups/device_sync/consent_sync.rs @@ -18,7 +18,12 @@ where provider: &XmtpOpenMlsProvider, record: &StoredConsentRecord, ) -> Result<(), DeviceSyncError> { - tracing::info!("Streaming consent update. {:?}", record); + tracing::info!( + inbox_id = self.inbox_id(), + installation_id = hex::encode(self.installation_public_key()), + "Streaming consent update. {:?}", + record + ); let conn = provider.conn_ref(); let consent_update_proto = ConsentUpdateProto { @@ -75,6 +80,7 @@ pub(crate) mod tests { builder::ClientBuilder, groups::scoped_client::LocalScopedGroupClient, storage::consent_record::{ConsentState, ConsentType}, + utils::test::{wait_for_all_intents_published, wait_for_min_intents}, }; use xmtp_cryptography::utils::generate_local_wallet; use xmtp_id::InboxOwner; @@ -106,21 +112,30 @@ pub(crate) mod tests { let amal_b = ClientBuilder::new_test_client_with_history(&wallet, &history_sync_url).await; let amal_b_provider = amal_b.mls_provider().unwrap(); let amal_b_conn = amal_b_provider.conn_ref(); - let consent_records_b = amal_b.syncable_consent_records(amal_b_conn).unwrap(); assert_eq!(consent_records_b.len(), 0); - let old_group_id = amal_a.get_sync_group().unwrap().group_id; + // make sure amal's worker has time to sync + wait_for_min_intents(amal_b_conn, 1).await; + tracing::info!("Waiting for intents published"); + tokio::join!( + wait_for_all_intents_published(amal_b_conn), + wait_for_all_intents_published(amal_a_conn) + ); + + let old_group_id = amal_a.get_sync_group(amal_a_conn).unwrap().group_id; + tracing::info!("Old Group Id: {}", hex::encode(&old_group_id)); // Check for new welcomes to new groups in the first installation (should be welcomed to a new sync group from amal_b). amal_a.sync_welcomes(amal_a_conn).await.unwrap(); - let new_group_id = amal_a.get_sync_group().unwrap().group_id; + let new_group_id = amal_a.get_sync_group(amal_a_conn).unwrap().group_id; + tracing::info!("New Group Id: {}", hex::encode(&new_group_id)); // group id should have changed to the new sync group created by the second installation assert_ne!(old_group_id, new_group_id); let consent_a = amal_a.syncable_consent_records(amal_a_conn).unwrap().len(); // Have amal_a receive the message (and auto-process) - let amal_a_sync_group = amal_a.get_sync_group().unwrap(); + let amal_a_sync_group = amal_a.get_sync_group(amal_a_conn).unwrap(); assert_ok!(amal_a_sync_group.sync_with_conn(&amal_a_provider).await); // Wait for up to 3 seconds for the reply on amal_b (usually is almost instant) @@ -148,7 +163,7 @@ pub(crate) mod tests { } // Test consent streaming - let amal_b_sync_group = amal_b.get_sync_group().unwrap(); + let amal_b_sync_group = amal_b.get_sync_group(amal_b_conn).unwrap(); let bo_wallet = generate_local_wallet(); // Ensure bo is not consented with amal_b diff --git a/xmtp_mls/src/groups/device_sync/message_sync.rs b/xmtp_mls/src/groups/device_sync/message_sync.rs index 3edab260f..5b95006ed 100644 --- a/xmtp_mls/src/groups/device_sync/message_sync.rs +++ b/xmtp_mls/src/groups/device_sync/message_sync.rs @@ -92,13 +92,13 @@ pub(crate) mod tests { let groups_b = amal_b.syncable_groups(amal_b_conn).unwrap(); assert_eq!(groups_b.len(), 0); - let old_group_id = amal_a.get_sync_group().unwrap().group_id; + let old_group_id = amal_a.get_sync_group(amal_a_conn).unwrap().group_id; // Check for new welcomes to new groups in the first installation (should be welcomed to a new sync group from amal_b). amal_a .sync_welcomes(amal_a_conn) .await .expect("sync_welcomes"); - let new_group_id = amal_a.get_sync_group().unwrap().group_id; + let new_group_id = amal_a.get_sync_group(amal_a_conn).unwrap().group_id; // group id should have changed to the new sync group created by the second installation assert_ne!(old_group_id, new_group_id); @@ -109,7 +109,7 @@ pub(crate) mod tests { .unwrap(); // Have amal_a receive the message (and auto-process) - let amal_a_sync_group = amal_a.get_sync_group().unwrap(); + let amal_a_sync_group = amal_a.get_sync_group(amal_a_conn).unwrap(); assert_ok!(amal_a_sync_group.sync_with_conn(&amal_a_provider).await); // Wait for up to 3 seconds for the reply on amal_b (usually is almost instant) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index bd0b3807b..eb5cae25a 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -546,10 +546,10 @@ impl MlsGroup { pub(crate) fn create_and_insert_sync_group( client: Arc, + provider: &XmtpOpenMlsProvider, ) -> Result, GroupError> { let context = client.context(); let creator_inbox_id = context.inbox_id(); - let provider = client.mls_provider()?; let protected_metadata = build_protected_metadata_extension(creator_inbox_id, ConversationType::Sync)?; diff --git a/xmtp_mls/src/lib.rs b/xmtp_mls/src/lib.rs index 6384cc5de..48099ac97 100644 --- a/xmtp_mls/src/lib.rs +++ b/xmtp_mls/src/lib.rs @@ -119,15 +119,27 @@ pub(crate) mod tests { #[cfg_attr(not(target_arch = "wasm32"), ctor::ctor)] #[cfg(not(target_arch = "wasm32"))] fn _setup() { - use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + use tracing_subscriber::{ + fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, + }; - let filter = EnvFilter::builder() - .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) - .from_env_lossy(); + let structured = std::env::var("STRUCTURED"); + let is_structured = matches!(structured, Ok(s) if s == "true" || s == "1"); + let filter = || { + EnvFilter::builder() + .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) + .from_env_lossy() + }; tracing_subscriber::registry() - .with(fmt::layer().pretty()) - .with(filter) + .with(is_structured.then(|| { + tracing_subscriber::fmt::layer() + .json() + .flatten_event(true) + .with_level(true) + .with_filter(filter()) + })) + .with((!is_structured).then(|| fmt::layer().compact().with_filter(filter()))) .init(); } diff --git a/xmtp_mls/src/utils/test/mod.rs b/xmtp_mls/src/utils/test/mod.rs index 47b56c2bb..0b8e038fd 100755 --- a/xmtp_mls/src/utils/test/mod.rs +++ b/xmtp_mls/src/utils/test/mod.rs @@ -1,5 +1,13 @@ #![allow(clippy::unwrap_used)] +use crate::storage::DbConnection; +use crate::{ + builder::ClientBuilder, + identity::IdentityStrategy, + storage::{EncryptedMessageStore, StorageOption}, + types::Address, + Client, InboxOwner, XmtpApi, +}; use rand::{ distributions::{Alphanumeric, DistString}, Rng, RngCore, @@ -16,14 +24,6 @@ use xmtp_id::{ }; use xmtp_proto::api_client::XmtpTestClient; -use crate::{ - builder::ClientBuilder, - identity::IdentityStrategy, - storage::{EncryptedMessageStore, StorageOption}, - types::Address, - Client, InboxOwner, XmtpApi, -}; - #[cfg(not(target_arch = "wasm32"))] pub mod traced_test; #[cfg(not(target_arch = "wasm32"))] @@ -109,6 +109,7 @@ impl ClientBuilder { ) } } + impl ClientBuilder { pub async fn new_test_client(owner: &impl InboxOwner) -> FullXmtpClient { let api_client = ::create_local().await; @@ -235,8 +236,7 @@ where register_client(&client, owner).await; if client.history_sync_url.is_some() { - let provider = client.mls_provider().unwrap(); - client.start_sync_worker(&provider).await.unwrap(); + client.start_sync_worker().await.unwrap(); } client @@ -298,3 +298,51 @@ pub async fn register_client( client.register_identity(signature_request).await.unwrap(); } + +/// waits for all intents to finish +pub async fn wait_for_all_intents_published(conn: &DbConnection) { + use crate::storage::group_intent::IntentState; + use crate::storage::group_intent::StoredGroupIntent; + use crate::storage::schema::group_intents::dsl; + use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; + + let mut query = dsl::group_intents.into_boxed(); + let states = vec![IntentState::ToPublish]; + query = query.filter(dsl::state.eq_any(states)); + query = query.order(dsl::id.asc()); + + let intents = conn + .raw_query(|conn| query.load::(conn)) + .unwrap(); + + tracing::info!("{} intents left", intents.len()); + tracing::info!("{:?}", intents); + if intents.len() > 0 { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Box::pin(wait_for_all_intents_published(conn)).await + } +} + +/// wait for a minimum amount of intent +pub async fn wait_for_min_intents(conn: &DbConnection, n: usize) { + use crate::storage::group_intent::IntentState; + use crate::storage::group_intent::StoredGroupIntent; + use crate::storage::schema::group_intents::dsl; + use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; + + let mut query = dsl::group_intents.into_boxed(); + let states = vec![IntentState::Published]; + query = query.filter(dsl::state.eq_any(states)); + query = query.order(dsl::id.asc()); + + let intents = conn + .raw_query(|conn| query.load::(conn)) + .unwrap(); + + tracing::info!("{} intents left", intents.len()); + tracing::info!("{:?}", intents); + if intents.len() != n { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Box::pin(wait_for_min_intents(conn, n - intents.len())).await + } +}