Skip to content

Commit

Permalink
Properly wait for contacts sync
Browse files Browse the repository at this point in the history
  • Loading branch information
gferon committed Jan 12, 2023
1 parent 07a0c4a commit a2d8d8a
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::{UNIX_EPOCH, Duration};
use std::time::{Duration, UNIX_EPOCH};

use futures::{channel::mpsc, channel::oneshot, future, pin_mut, AsyncReadExt, Stream, StreamExt};
use log::{debug, error, info, trace};
Expand Down Expand Up @@ -572,12 +572,13 @@ impl<C: Store> Manager<C, Registered> {
.await?;

// wait for it to arrive
info!("waiting for contacts sync for up to 5 minutes");
info!("waiting for contacts sync for up to 3 minutes");
tokio::time::timeout(
Duration::from_secs(5 * 60),
Duration::from_secs(3 * 60),
self.wait_for_contacts_sync(messages),
)
.await.map_err(Error::from)??;
.await
.map_err(Error::from)??;

Ok(())
}
Expand Down Expand Up @@ -640,25 +641,46 @@ impl<C: Store> Manager<C, Registered> {
///
/// Returns a [Stream] of messages to consume. Messages will also be stored by the implementation of the [MessageStore].
pub async fn receive_messages(&mut self) -> Result<impl Stream<Item = Content>, Error> {
self.receive_messages_stream(false).await
}

async fn receive_messages_stream(
&mut self,
include_internal_events: bool,
) -> Result<impl Stream<Item = Content>, Error> {
struct StreamState<S, C> {
encrypted_messages: S,
service_cipher: ServiceCipher<C>,
config_store: C,
include_internal_events: bool,
}

let init = StreamState {
encrypted_messages: Box::pin(self.receive_messages_encrypted().await?),
service_cipher: self.new_service_cipher()?,
config_store: self.config_store.clone(),
include_internal_events,
};

Ok(futures::stream::unfold(init, |mut state| async move {
loop {
match state.encrypted_messages.next().await {
Some(Ok(envelope)) => {
match state.service_cipher.open_envelope(envelope).await {
// contacts synchronization sent from the primary device (happens after linking, or on demand)
Ok(Some(content)) => {
// contacts synchronization sent from the primary device (happens after linking, or on demand)
if let ContentBody::SynchronizeMessage(SyncMessage {
contacts: Some(_),
..
}) = &content.body
{
if state.include_internal_events {
return Some((content, state));
} else {
return None;
}
}

if let Ok(thread) = Thread::try_from(&content) {
// TODO: handle reactions here, we should update the original message?
if let Err(e) =
Expand Down

0 comments on commit a2d8d8a

Please sign in to comment.