Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speed up sync init #1363

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl FfiXmtpClient {
}

pub fn installation_id(&self) -> Vec<u8> {
self.inner_client.installation_public_key()
self.inner_client.installation_public_key().to_vec()
}

pub fn release_db_connection(&self) -> Result<(), GenericError> {
Expand Down Expand Up @@ -380,7 +380,7 @@ impl FfiXmtpClient {
signature_bytes: Vec<u8>,
) -> Result<(), GenericError> {
let inner = self.inner_client.as_ref();
let public_key = inner.installation_public_key();
let public_key = inner.installation_public_key().to_vec();

self.verify_signed_with_public_key(signature_text, signature_bytes, public_key)
}
Expand Down Expand Up @@ -454,12 +454,8 @@ impl FfiXmtpClient {
return Ok(());
}

let provider = self
.inner_client
.mls_provider()
.map_err(GenericError::from_error)?;
self.inner_client
.start_sync_worker(&provider)
.start_sync_worker()
.await
.map_err(GenericError::from_error)?;

Expand Down Expand Up @@ -537,7 +533,7 @@ impl FfiXmtpClient {
let other_installation_ids = inbox_state
.installation_ids()
.into_iter()
.filter(|id| id != &installation_id)
.filter(|id| id != installation_id)
.collect();

let signature_request = self
Expand Down Expand Up @@ -911,7 +907,8 @@ impl FfiConversations {

pub fn get_sync_group(&self) -> Result<FfiConversation, GenericError> {
let inner = self.inner_client.as_ref();
let sync_group = inner.get_sync_group()?;
let conn = inner.store().conn()?;
let sync_group = inner.get_sync_group(&conn)?;
Ok(sync_group.into())
}

Expand Down Expand Up @@ -2096,7 +2093,7 @@ mod tests {
.unwrap();
register_client(&ffi_inbox_owner, &client_a).await;

let installation_pub_key = client_a.inner_client.installation_public_key();
let installation_pub_key = client_a.inner_client.installation_public_key().to_vec();
drop(client_a);

let client_b = create_client(
Expand All @@ -2114,7 +2111,7 @@ mod tests {
.await
.unwrap();

let other_installation_pub_key = client_b.inner_client.installation_public_key();
let other_installation_pub_key = client_b.inner_client.installation_public_key().to_vec();
drop(client_b);

assert!(
Expand Down
2 changes: 1 addition & 1 deletion bindings_node/src/signatures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Client {
let other_installation_ids = inbox_state
.installation_ids()
.into_iter()
.filter(|id| id != &installation_id)
.filter(|id| id != installation_id)
.collect();
let signature_request = self
.inner_client()
Expand Down
2 changes: 1 addition & 1 deletion bindings_wasm/src/signatures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Client {
let other_installation_ids = inbox_state
.installation_ids()
.into_iter()
.filter(|id| id != &installation_id)
.filter(|id| id != installation_id)
.collect();
let signature_request = self
.inner_client()
Expand Down
4 changes: 2 additions & 2 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ async fn main() -> color_eyre::eyre::Result<()> {
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
client.start_sync_worker(&provider).await.unwrap();
client.start_sync_worker().await.unwrap();
client
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
.await
Expand All @@ -453,7 +453,7 @@ async fn main() -> color_eyre::eyre::Result<()> {
Commands::ListHistorySyncMessages {} => {
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let group = client.get_sync_group()?;
let group = client.get_sync_group(&conn)?;
let group_id_str = hex::encode(group.group_id.clone());
group.sync().await?;
let messages = group
Expand Down
2 changes: 1 addition & 1 deletion examples/cli/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub async fn debug_welcome_messages(
) -> Result<(), String> {
let api_client = client.api();
let envelopes = api_client
.query_welcome_messages(installation_id, None)
.query_welcome_messages(&installation_id, None)
.await
.unwrap();
for envelope in envelopes {
Expand Down
5 changes: 4 additions & 1 deletion xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ tracing-subscriber = { workspace = true, features = [
"env-filter",
"fmt",
"ansi",
"json",
"registry"
], optional = true }


Expand Down Expand Up @@ -151,6 +153,7 @@ tracing-subscriber = { workspace = true, features = [
"env-filter",
"fmt",
"ansi",
"json",
] }
xmtp_api_grpc = { path = "../xmtp_api_grpc", features = ["test-utils"] }
xmtp_api_http = { path = "../xmtp_api_http", features = ["test-utils"] }
Expand All @@ -163,7 +166,7 @@ diesel-wasm-sqlite = { workspace = true, features = [
] }
ethers = { workspace = true, features = ["rustls"] }
openmls = { workspace = true, features = ["js"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing-subscriber = { workspace = true, features = ["env-filter", "json"] }
tracing-wasm = { version = "0.2" }
wasm-bindgen-test.workspace = true
xmtp_api_http = { path = "../xmtp_api_http", features = ["test-utils"] }
Expand Down
7 changes: 3 additions & 4 deletions xmtp_mls/benches/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ fn start_sync_worker(c: &mut Criterion) {
|| {
bench_async_setup(|| async {
let client = clients::new_client(true).await;
let provider = client.mls_provider().unwrap();
// set history sync URL
(client, provider, span.clone())
(client, span.clone())
})
},
|(client, provider, span)| async move {
|(client, span)| async move {
client
.start_sync_worker(&provider)
.start_sync_worker()
.instrument(span)
.await
.unwrap()
Expand Down
6 changes: 3 additions & 3 deletions xmtp_mls/src/api/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ where
#[tracing::instrument(level = "trace", skip_all)]
pub async fn query_welcome_messages(
&self,
installation_id: Vec<u8>,
installation_id: &[u8],
id_cursor: Option<u64>,
) -> Result<Vec<WelcomeMessage>, ApiError> {
tracing::debug!(
installation_id = hex::encode(&installation_id),
installation_id = hex::encode(installation_id),
cursor = id_cursor,
inbox_id = self.inbox_id,
"query welcomes"
Expand All @@ -135,7 +135,7 @@ where
(async {
self.api_client
.query_welcome_messages(QueryWelcomeMessagesRequest {
installation_key: installation_id.clone(),
installation_key: installation_id.to_vec(),
paging_info: Some(PagingInfo {
id_cursor: id_cursor.unwrap_or(0),
limit: page_size,
Expand Down
6 changes: 3 additions & 3 deletions xmtp_mls/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ pub(crate) mod tests {
register_client(&client_a, wallet).await;
assert!(client_a.identity().is_ready());

let keybytes_a = client_a.installation_public_key();
let keybytes_a = client_a.installation_public_key().to_vec();
drop(client_a);

// Reload the existing store and wallet
Expand All @@ -729,7 +729,7 @@ pub(crate) mod tests {
.build_with_verifier()
.await
.unwrap();
let keybytes_b = client_b.installation_public_key();
let keybytes_b = client_b.installation_public_key().to_vec();
drop(client_b);

// Ensure the persistence was used to store the generated keys
Expand Down Expand Up @@ -762,7 +762,7 @@ pub(crate) mod tests {
.build_with_verifier()
.await
.unwrap();
assert_eq!(client_d.installation_public_key(), keybytes_a);
assert_eq!(client_d.installation_public_key().to_vec(), keybytes_a);
}

/// anvil cannot be used in WebAssembly
Expand Down
56 changes: 35 additions & 21 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ pub struct XmtpMlsLocalContext {

impl XmtpMlsLocalContext {
/// The installation public key is the primary identifier for an installation
pub fn installation_public_key(&self) -> Vec<u8> {
self.identity.installation_keys.public_slice().to_vec()
pub fn installation_public_key(&self) -> &[u8; 32] {
self.identity.installation_keys.public_bytes()
}

/// Get the account address of the blockchain account associated with this client
Expand Down Expand Up @@ -259,7 +259,7 @@ where
V: SmartContractSignatureVerifier,
{
/// Retrieves the client's installation public key, sometimes also called `installation_id`
pub fn installation_public_key(&self) -> Vec<u8> {
pub fn installation_public_key(&self) -> &[u8; 32] {
self.context.installation_public_key()
}
/// Retrieves the client's inbox ID
Expand Down Expand Up @@ -560,20 +560,25 @@ where
Ok(group)
}

pub(crate) fn create_sync_group(&self) -> Result<MlsGroup<Self>, ClientError> {
pub(crate) fn create_sync_group(
&self,
provider: &XmtpOpenMlsProvider,
) -> Result<MlsGroup<Self>, ClientError> {
tracing::info!("creating sync group");
let sync_group = MlsGroup::create_and_insert_sync_group(Arc::new(self.clone()))?;
let sync_group = MlsGroup::create_and_insert_sync_group(Arc::new(self.clone()), provider)?;

Ok(sync_group)
}

/**
* Look up a group by its ID
*
* Returns a [`MlsGroup`] if the group exists, or an error if it does not
*/
pub fn group(&self, group_id: Vec<u8>) -> Result<MlsGroup<Self>, ClientError> {
let conn = &mut self.store().conn()?;
/// Look up a group by its ID
///
/// Returns a [`MlsGroup`] if the group exists, or an error if it does not
///
pub fn group_with_conn(
&self,
conn: &DbConnection,
group_id: Vec<u8>,
) -> Result<MlsGroup<Self>, ClientError> {
let stored_group: Option<StoredGroup> = conn.fetch(&group_id)?;
match stored_group {
Some(group) => Ok(MlsGroup::new(self.clone(), group.id, group.created_at_ns)),
Expand All @@ -584,6 +589,15 @@ where
}
}

/// Look up a group by its ID
///
/// Returns a [`MlsGroup`] if the group exists, or an error if it does not
///
pub fn group(&self, group_id: Vec<u8>) -> Result<MlsGroup<Self>, ClientError> {
let conn = &mut self.store().conn()?;
self.group_with_conn(conn, group_id)
}

/**
* Look up a DM group by the target's inbox_id.
*
Expand Down Expand Up @@ -697,7 +711,7 @@ where
conn: &DbConnection,
) -> Result<Vec<WelcomeMessage>, ClientError> {
let installation_id = self.installation_public_key();
let id_cursor = conn.get_last_cursor_for_id(&installation_id, EntityKind::Welcome)?;
let id_cursor = conn.get_last_cursor_for_id(installation_id, EntityKind::Welcome)?;

let welcomes = self
.api_client
Expand Down Expand Up @@ -747,7 +761,7 @@ where
(async {
let welcome_v1 = &welcome_v1;
self.intents.process_for_id(
&id,
id,
EntityKind::Welcome,
welcome_v1.id,
|provider| async move {
Expand Down Expand Up @@ -1021,7 +1035,7 @@ pub(crate) mod tests {

// Get original KeyPackage.
let kp1 = client
.get_key_packages_for_installation_ids(vec![client.installation_public_key()])
.get_key_packages_for_installation_ids(vec![client.installation_public_key().to_vec()])
.await
.unwrap();
assert_eq!(kp1.len(), 1);
Expand All @@ -1031,7 +1045,7 @@ pub(crate) mod tests {
client.rotate_key_package().await.unwrap();

let kp2 = client
.get_key_packages_for_installation_ids(vec![client.installation_public_key()])
.get_key_packages_for_installation_ids(vec![client.installation_public_key().to_vec()])
.await
.unwrap();
assert_eq!(kp2.len(), 1);
Expand Down Expand Up @@ -1299,9 +1313,9 @@ pub(crate) mod tests {
let bo_store = bo.store();

let alix_original_init_key =
get_key_package_init_key(&alix, &alix.installation_public_key()).await;
get_key_package_init_key(&alix, alix.installation_public_key()).await;
let bo_original_init_key =
get_key_package_init_key(&bo, &bo.installation_public_key()).await;
get_key_package_init_key(&bo, bo.installation_public_key()).await;

// Bo's original key should be deleted
let bo_original_from_db = bo_store
Expand All @@ -1320,19 +1334,19 @@ pub(crate) mod tests {

bo.sync_welcomes(&bo.store().conn().unwrap()).await.unwrap();

let bo_new_key = get_key_package_init_key(&bo, &bo.installation_public_key()).await;
let bo_new_key = get_key_package_init_key(&bo, bo.installation_public_key()).await;
// Bo's key should have changed
assert_ne!(bo_original_init_key, bo_new_key);

bo.sync_welcomes(&bo.store().conn().unwrap()).await.unwrap();
let bo_new_key_2 = get_key_package_init_key(&bo, &bo.installation_public_key()).await;
let bo_new_key_2 = get_key_package_init_key(&bo, bo.installation_public_key()).await;
// Bo's key should not have changed syncing the second time.
assert_eq!(bo_new_key, bo_new_key_2);

alix.sync_welcomes(&alix.store().conn().unwrap())
.await
.unwrap();
let alix_key_2 = get_key_package_init_key(&alix, &alix.installation_public_key()).await;
let alix_key_2 = get_key_package_init_key(&alix, alix.installation_public_key()).await;
// Alix's key should not have changed at all
assert_eq!(alix_original_init_key, alix_key_2);

Expand Down
Loading
Loading