Skip to content

Commit

Permalink
fix (sync): resolve listener deadlock and ensure client is cloned
Browse files Browse the repository at this point in the history
We ensure the client is cloned as it is a lightweight clone and enables
concurrency. See hyperium/tonic#33
  • Loading branch information
hydra-yse committed Oct 17, 2024
1 parent 3fef48b commit aeeec49
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions lib/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,19 @@ impl SyncService for BreezSyncService {
}

async fn listen(self: Arc<Self>) -> Result<()> {
let Some(ref mut client) = *self.client.lock().await else {
return Err(anyhow!(
"Cannot run `get_changes_since`: client not connected"
));
let Some(mut client) = self.client.lock().await.clone() else {
return Err(anyhow!("Cannot listen to changes: client not connected"));
};

let request = ListenChangesRequest::new(utils::now(), self.signer.clone())
.map_err(|err| anyhow!("Could not sign ListenChangesRequest: {err:?}"))?;
let mut stream = client.listen_changes(request).await?.into_inner();

let cloned = self.clone();
tokio::spawn(async move {
let mut stream = match client.listen_changes(request).await {
Ok(res) => res.into_inner(),
Err(err) => return warn!("Could not listen to changes: {err:?}"),
};
debug!("Sync service: Started listening to changes");
while let Some(message) = stream.next().await {
match message {
Expand Down Expand Up @@ -155,7 +156,7 @@ impl SyncService for BreezSyncService {
}

async fn get_changes_since(&self, from_id: i64) -> Result<Vec<Record>> {
let Some(ref mut client) = *self.client.lock().await else {
let Some(mut client) = self.client.lock().await.clone() else {
return Err(anyhow!(
"Cannot run `get_changes_since`: client not connected"
));
Expand All @@ -169,7 +170,7 @@ impl SyncService for BreezSyncService {
}

async fn set_record(&self, data: SyncData) -> Result<()> {
let Some(ref mut client) = *self.client.lock().await else {
let Some(mut client) = self.client.lock().await.clone() else {
return Err(anyhow!("Cannot run `set_record`: client not connected"));
};

Expand Down

0 comments on commit aeeec49

Please sign in to comment.