Skip to content

Commit

Permalink
speed up sync init
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Dec 3, 2024
1 parent 85dd6d3 commit a9c0848
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 73 deletions.
9 changes: 3 additions & 6 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,8 @@ impl FfiXmtpClient {
return Ok(());
}

let provider = self
.inner_client
.mls_provider()
.map_err(GenericError::from_error)?;
self.inner_client
.start_sync_worker(&provider)
.start_sync_worker()
.await
.map_err(GenericError::from_error)?;

Expand Down Expand Up @@ -911,7 +907,8 @@ impl FfiConversations {

pub fn get_sync_group(&self) -> Result<FfiConversation, GenericError> {
let inner = self.inner_client.as_ref();
let sync_group = inner.get_sync_group()?;
let conn = inner.store().conn()?;
let sync_group = inner.get_sync_group(&conn)?;
Ok(sync_group.into())
}

Expand Down
4 changes: 2 additions & 2 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ async fn main() -> color_eyre::eyre::Result<()> {
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
client.start_sync_worker(&provider).await.unwrap();
client.start_sync_worker().await.unwrap();
client
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
.await
Expand All @@ -453,7 +453,7 @@ async fn main() -> color_eyre::eyre::Result<()> {
Commands::ListHistorySyncMessages {} => {
let conn = client.store().conn()?;
client.sync_welcomes(&conn).await?;
let group = client.get_sync_group()?;
let group = client.get_sync_group(&conn)?;
let group_id_str = hex::encode(group.group_id.clone());
group.sync().await?;
let messages = group
Expand Down
5 changes: 4 additions & 1 deletion xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ tracing-subscriber = { workspace = true, features = [
"env-filter",
"fmt",
"ansi",
"json",
"registry"
], optional = true }


Expand Down Expand Up @@ -151,6 +153,7 @@ tracing-subscriber = { workspace = true, features = [
"env-filter",
"fmt",
"ansi",
"json",
] }
xmtp_api_grpc = { path = "../xmtp_api_grpc", features = ["test-utils"] }
xmtp_api_http = { path = "../xmtp_api_http", features = ["test-utils"] }
Expand All @@ -163,7 +166,7 @@ diesel-wasm-sqlite = { workspace = true, features = [
] }
ethers = { workspace = true, features = ["rustls"] }
openmls = { workspace = true, features = ["js"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing-subscriber = { workspace = true, features = ["env-filter", "json"] }
tracing-wasm = { version = "0.2" }
wasm-bindgen-test.workspace = true
xmtp_api_http = { path = "../xmtp_api_http", features = ["test-utils"] }
Expand Down
7 changes: 3 additions & 4 deletions xmtp_mls/benches/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ fn start_sync_worker(c: &mut Criterion) {
|| {
bench_async_setup(|| async {
let client = clients::new_client(true).await;
let provider = client.mls_provider().unwrap();
// set history sync URL
(client, provider, span.clone())
(client, span.clone())
})
},
|(client, provider, span)| async move {
|(client, span)| async move {
client
.start_sync_worker(&provider)
.start_sync_worker()
.instrument(span)
.await
.unwrap()
Expand Down
34 changes: 24 additions & 10 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ pub struct XmtpMlsLocalContext {
impl XmtpMlsLocalContext {
/// The installation public key is the primary identifier for an installation
pub fn installation_public_key(&self) -> Vec<u8> {
self.identity.installation_keys.public_slice().to_vec()
self.identity.installation_keys.public_bytes().to_vec()
}

/// Get the account address of the blockchain account associated with this client
Expand Down Expand Up @@ -560,20 +560,25 @@ where
Ok(group)
}

pub(crate) fn create_sync_group(&self) -> Result<MlsGroup<Self>, ClientError> {
pub(crate) fn create_sync_group(
&self,
provider: &XmtpOpenMlsProvider,
) -> Result<MlsGroup<Self>, ClientError> {
tracing::info!("creating sync group");
let sync_group = MlsGroup::create_and_insert_sync_group(Arc::new(self.clone()))?;
let sync_group = MlsGroup::create_and_insert_sync_group(Arc::new(self.clone()), provider)?;

Ok(sync_group)
}

/**
* Look up a group by its ID
*
* Returns a [`MlsGroup`] if the group exists, or an error if it does not
*/
pub fn group(&self, group_id: Vec<u8>) -> Result<MlsGroup<Self>, ClientError> {
let conn = &mut self.store().conn()?;
/// Look up a group by its ID
///
/// Returns a [`MlsGroup`] if the group exists, or an error if it does not
///
pub fn group_with_conn(
&self,
conn: &DbConnection,
group_id: Vec<u8>,
) -> Result<MlsGroup<Self>, ClientError> {
let stored_group: Option<StoredGroup> = conn.fetch(&group_id)?;
match stored_group {
Some(group) => Ok(MlsGroup::new(self.clone(), group.id, group.created_at_ns)),
Expand All @@ -584,6 +589,15 @@ where
}
}

/// Look up a group by its ID
///
/// Returns a [`MlsGroup`] if the group exists, or an error if it does not
///
pub fn group(&self, group_id: Vec<u8>) -> Result<MlsGroup<Self>, ClientError> {
let conn = &mut self.store().conn()?;
self.group_with_conn(conn, group_id)
}

/**
* Look up a DM group by the target's inbox_id.
*
Expand Down
98 changes: 74 additions & 24 deletions xmtp_mls/src/groups/device_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,25 +118,47 @@ where
ApiClient: XmtpApi + Send + Sync + 'static,
V: SmartContractSignatureVerifier + Send + Sync + 'static,
{
// TODO: Should we ensure that only one sync worker is running at a time?
#[instrument(level = "trace", skip_all)]
pub async fn start_sync_worker(
&self,
provider: &XmtpOpenMlsProvider,
) -> Result<(), DeviceSyncError> {
self.sync_init(provider).await?;

pub async fn start_sync_worker(&self) -> Result<(), DeviceSyncError> {
crate::spawn(None, {
let client = self.clone();

let installation_id = client.installation_public_key();
tracing::debug!(
inbox_id = client.inbox_id(),
installation_id = hex::encode(&installation_id),
"starting sync worker"
);
let receiver = client.local_events.subscribe();
let sync_stream = receiver.stream_sync_messages();

async move {
pin_mut!(sync_stream);
let inbox_id = client.inbox_id();
// scope ensures the provider is dropped once init is finished, and not
// held for entirety of sync.
let res = async {
let provider = client.mls_provider()?;
client.sync_init(&provider).await?;
Ok::<_, DeviceSyncError>(())
};

if let Err(e) = res.await {
tracing::error!(
inbox_id,
installation_id = hex::encode(&installation_id),
"sync worker failed to init error = {e}"
);
}

while let Err(err) = client.sync_worker(&mut sync_stream).await {
tracing::error!("Sync worker error: {err}");
tracing::error!(
inbox_id,
installation_id = hex::encode(&installation_id),
"Sync worker error: {err}"
);
}
Ok::<_, DeviceSyncError>(())
}
});

Expand Down Expand Up @@ -184,7 +206,11 @@ where
};

if let Err(err) = self.process_sync_reply(&provider, reply).await {
tracing::warn!("Sync worker error: {err}");
tracing::warn!(
inbox_id = self.inbox_id(),
installation_id = hex::encode(self.installation_public_key()),
"Sync worker error: {err}"
);
}
}
SyncMessage::Request { message_id } => {
Expand All @@ -207,7 +233,11 @@ where
};

if let Err(err) = self.reply_to_sync_request(&provider, request).await {
tracing::warn!("Sync worker error: {err}");
tracing::warn!(
inbox_id = self.inbox_id(),
installation_id = hex::encode(self.installation_public_key()),
"Sync worker error: {err}"
);
}
}
},
Expand Down Expand Up @@ -235,18 +265,24 @@ where
#[instrument(level = "trace", skip_all)]
pub async fn sync_init(&self, provider: &XmtpOpenMlsProvider) -> Result<(), DeviceSyncError> {
tracing::info!(
inbox_id = self.inbox_id(),
installation_id = hex::encode(self.installation_public_key()),
"Initializing device sync... url: {:?}",
self.history_sync_url
);
if self.get_sync_group().is_err() {
if self.get_sync_group(provider.conn_ref()).is_err() {
self.ensure_sync_group(provider).await?;

self.send_sync_request(provider, DeviceSyncKind::Consent)
.await?;
self.send_sync_request(provider, DeviceSyncKind::MessageHistory)
.await?;
}
tracing::info!("Device sync initialized.");
tracing::info!(
inbox_id = self.inbox_id(),
installation_id = hex::encode(self.installation_public_key()),
"Device sync initialized."
);

Ok(())
}
Expand All @@ -256,9 +292,9 @@ where
&self,
provider: &XmtpOpenMlsProvider,
) -> Result<MlsGroup<Self>, GroupError> {
let sync_group = match self.get_sync_group() {
let sync_group = match self.get_sync_group(provider.conn_ref()) {
Ok(group) => group,
Err(_) => self.create_sync_group()?,
Err(_) => self.create_sync_group(provider)?,
};
sync_group
.maybe_update_installations(provider, None)
Expand All @@ -274,11 +310,15 @@ where
provider: &XmtpOpenMlsProvider,
kind: DeviceSyncKind,
) -> Result<DeviceSyncRequestProto, DeviceSyncError> {
tracing::info!("Sending a sync request for {kind:?}");
tracing::info!(
inbox_id = self.inbox_id(),
installation_id = hex::encode(self.installation_public_key()),
"Sending a sync request for {kind:?}"
);
let request = DeviceSyncRequest::new(kind);

// find the sync group
let sync_group = self.get_sync_group()?;
let sync_group = self.get_sync_group(provider.conn_ref())?;

// sync the group
sync_group.sync_with_conn(provider).await?;
Expand Down Expand Up @@ -306,7 +346,12 @@ where

// publish the intent
if let Err(err) = sync_group.publish_intents(provider).await {
tracing::error!("error publishing sync group intents: {:?}", err);
tracing::error!(
inbox_id = self.inbox_id(),
installation_id = hex::encode(self.installation_public_key()),
"error publishing sync group intents: {:?}",
err
);
}

Ok(request)
Expand Down Expand Up @@ -342,7 +387,7 @@ where
) -> Result<(), DeviceSyncError> {
let conn = provider.conn_ref();
// find the sync group
let sync_group = self.get_sync_group()?;
let sync_group = self.get_sync_group(provider.conn_ref())?;

// sync the group
sync_group.sync_with_conn(provider).await?;
Expand Down Expand Up @@ -383,7 +428,7 @@ where
provider: &XmtpOpenMlsProvider,
kind: DeviceSyncKind,
) -> Result<(StoredGroupMessage, DeviceSyncRequestProto), DeviceSyncError> {
let sync_group = self.get_sync_group()?;
let sync_group = self.get_sync_group(provider.conn_ref())?;
sync_group.sync_with_conn(provider).await?;

let messages = sync_group
Expand Down Expand Up @@ -416,7 +461,7 @@ where
provider: &XmtpOpenMlsProvider,
kind: DeviceSyncKind,
) -> Result<Option<(StoredGroupMessage, DeviceSyncReplyProto)>, DeviceSyncError> {
let sync_group = self.get_sync_group()?;
let sync_group = self.get_sync_group(provider.conn_ref())?;
sync_group.sync_with_conn(provider).await?;

let messages = sync_group
Expand Down Expand Up @@ -500,7 +545,11 @@ where
return Err(DeviceSyncError::MissingHistorySyncUrl);
};
let upload_url = format!("{url}/upload");
tracing::info!("Using upload url {upload_url}");
tracing::info!(
inbox_id = self.inbox_id(),
installation_id = hex::encode(self.installation_public_key()),
"Using upload url {upload_url}"
);

let response = reqwest::Client::new()
.post(upload_url)
Expand All @@ -510,6 +559,8 @@ where

if !response.status().is_success() {
tracing::error!(
inbox_id = self.inbox_id(),
installation_id = hex::encode(self.installation_public_key()),
"Failed to upload file. Status code: {} Response: {response:?}",
response.status()
);
Expand Down Expand Up @@ -590,13 +641,12 @@ where
}

#[instrument(level = "trace", skip_all)]
pub fn get_sync_group(&self) -> Result<MlsGroup<Self>, GroupError> {
let conn = self.store().conn()?;
pub fn get_sync_group(&self, conn: &DbConnection) -> Result<MlsGroup<Self>, GroupError> {
let sync_group_id = conn
.latest_sync_group()?
.ok_or(GroupError::GroupNotFound)?
.id;
let sync_group = self.group(sync_group_id.clone())?;
let sync_group = self.group_with_conn(conn, sync_group_id.clone())?;

Ok(sync_group)
}
Expand Down
Loading

0 comments on commit a9c0848

Please sign in to comment.