Skip to content

Commit

Permalink
Extra logging to debug duplicate activities (ref LemmyNet#4609) (Lem…
Browse files Browse the repository at this point in the history
…myNet#4726)

* Extra logging to debug duplicate activities (ref LemmyNet#4609)

* Fix logging for api tests

* fmt

This is cherry-picked from 6b46a70
LemmyNet@6b46a70
LemmyNet#4726
  • Loading branch information
Nutomic authored and MrKaplan-lw committed May 26, 2024
1 parent 573daac commit c8bb166
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
5 changes: 5 additions & 0 deletions crates/federate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio::{
time::sleep,
};
use tokio_util::sync::CancellationToken;
use tracing::info;

mod util;
mod worker;
Expand Down Expand Up @@ -44,6 +45,10 @@ async fn start_stop_federation_workers(
let pool2 = &mut DbPool::Pool(&pool);
let process_index = opts.process_index - 1;
let local_domain = federation_config.settings().get_hostname_without_port()?;
info!(
"Starting federation workers for process count {} and index {}",
opts.process_count, process_index
);
loop {
let mut total_count = 0;
let mut dead_count = 0;
Expand Down
19 changes: 9 additions & 10 deletions crates/federate/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::{
};
use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};

/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt)
/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
Expand Down Expand Up @@ -105,6 +106,7 @@ impl InstanceWorker {
&mut self,
pool: &mut DbPool<'_>,
) -> Result<(), anyhow::Error> {
debug!("Starting federation worker for {}", self.instance.domain);
let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative");

self.update_communities(pool).await?;
Expand Down Expand Up @@ -184,15 +186,14 @@ impl InstanceWorker {
.await
.context("failed reading activity from db")?
else {
tracing::debug!("{}: {:?} does not exist", self.instance.domain, id);
debug!("{}: {:?} does not exist", self.instance.domain, id);
self.state.last_successful_id = Some(id);
continue;
};
if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await {
tracing::warn!(
warn!(
"sending {} errored internally, skipping activity: {:?}",
ele.0.ap_id,
e
ele.0.ap_id, e
);
}
if self.stop.is_cancelled() {
Expand All @@ -219,7 +220,7 @@ impl InstanceWorker {
.await
.context("failed figuring out inbox urls")?;
if inbox_urls.is_empty() {
tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id);
trace!("{}: {:?} no inboxes", self.instance.domain, activity.id);
self.state.last_successful_id = Some(activity.id);
self.state.last_successful_published_time = Some(activity.published);
return Ok(());
Expand All @@ -237,16 +238,14 @@ impl InstanceWorker {
SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?;
for task in requests {
// usually only one due to shared inbox
tracing::debug!("sending out {}", task);
trace!("sending out {}", task);
while let Err(e) = task.sign_and_send(&self.context).await {
self.state.fail_count += 1;
self.state.last_retry = Some(Utc::now());
let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count);
tracing::info!(
info!(
"{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})",
self.instance.domain,
activity.id,
self.state.fail_count
self.instance.domain, activity.id, self.state.fail_count
);
self.save_and_send_state(pool).await?;
tokio::select! {
Expand Down

0 comments on commit c8bb166

Please sign in to comment.