diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index a4dc495360..fc5bd2387b 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -13,6 +13,7 @@ use tokio::{ time::sleep, }; use tokio_util::sync::CancellationToken; +use tracing::info; mod util; mod worker; @@ -44,6 +45,10 @@ async fn start_stop_federation_workers( let pool2 = &mut DbPool::Pool(&pool); let process_index = opts.process_index - 1; let local_domain = federation_config.settings().get_hostname_without_port()?; + info!( + "Starting federation workers for process count {} and index {}", + opts.process_count, process_index + ); loop { let mut total_count = 0; let mut dead_count = 0; diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index b57c5e8aeb..d1e29cbe0e 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -34,6 +34,7 @@ use std::{ }; use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; +use tracing::{debug, info, trace, warn}; /// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) /// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. @@ -105,6 +106,7 @@ impl InstanceWorker { &mut self, pool: &mut DbPool<'_>, ) -> Result<(), anyhow::Error> { + debug!("Starting federation worker for {}", self.instance.domain); let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); self.update_communities(pool).await?; @@ -184,15 +186,14 @@ impl InstanceWorker { .await .context("failed reading activity from db")? else { - tracing::debug!("{}: {:?} does not exist", self.instance.domain, id); + debug!("{}: {:?} does not exist", self.instance.domain, id); self.state.last_successful_id = Some(id); continue; }; if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { - tracing::warn!( + warn!( "sending {} errored internally, skipping activity: {:?}", - ele.0.ap_id, - e + ele.0.ap_id, e ); } if self.stop.is_cancelled() { @@ -219,7 +220,7 @@ impl InstanceWorker { .await .context("failed figuring out inbox urls")?; if inbox_urls.is_empty() { - tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); + trace!("{}: {:?} no inboxes", self.instance.domain, activity.id); self.state.last_successful_id = Some(activity.id); self.state.last_successful_published_time = Some(activity.published); return Ok(()); @@ -237,16 +238,14 @@ impl InstanceWorker { SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?; for task in requests { // usually only one due to shared inbox - tracing::debug!("sending out {}", task); + trace!("sending out {}", task); while let Err(e) = task.sign_and_send(&self.context).await { self.state.fail_count += 1; self.state.last_retry = Some(Utc::now()); let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); - tracing::info!( + info!( "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", - self.instance.domain, - activity.id, - self.state.fail_count + self.instance.domain, activity.id, self.state.fail_count ); self.save_and_send_state(pool).await?; tokio::select! {