Skip to content

Commit

Permalink
move federation concurrent config to config file
Browse files Browse the repository at this point in the history
  • Loading branch information
phiresky committed Apr 15, 2024
1 parent e719baf commit 5e986ef
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 55 deletions.
9 changes: 5 additions & 4 deletions config/defaults.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@
port: 8536
# Whether the site is available over TLS. Needs to be true for federation to work.
tls_enabled: true
# The number of activitypub federation workers that can be in-flight concurrently
worker_count: 0
# The number of activitypub federation retry workers that can be in-flight concurrently
retry_count: 0
federation: {
# Limit to the number of concurrent outgoing federation requests per target instance.
# Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up.
concurrent_sends_per_instance: 1
}
prometheus: {
bind: "127.0.0.1"
port: 10002
Expand Down
39 changes: 27 additions & 12 deletions crates/federate/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::{util::CancellableTask, worker::InstanceWorker};
use activitypub_federation::config::FederationConfig;
use chrono::{Local, Timelike};
use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
use lemmy_api_common::{
context::LemmyContext,
federate_retry_sleep_duration,
lemmy_utils::settings::structs::FederationWorkerConfig,
};
use lemmy_db_schema::{
newtypes::InstanceId,
source::{federation_queue_state::FederationQueueState, instance::Instance},
Expand Down Expand Up @@ -36,7 +40,8 @@ pub struct Opts {
async fn start_stop_federation_workers(
opts: Opts,
pool: ActualDbPool,
federation_config: FederationConfig<LemmyContext>,
federation_lib_config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let mut workers = HashMap::<InstanceId, CancellableTask>::new();
Expand All @@ -45,7 +50,9 @@ async fn start_stop_federation_workers(
let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver));
let pool2 = &mut DbPool::Pool(&pool);
let process_index = opts.process_index - 1;
let local_domain = federation_config.settings().get_hostname_without_port()?;
let local_domain = federation_lib_config
.settings()
.get_hostname_without_port()?;
loop {
let mut total_count = 0;
let mut dead_count = 0;
Expand Down Expand Up @@ -73,15 +80,19 @@ async fn start_stop_federation_workers(
continue;
}
// create new worker
let config = federation_config.clone();
let config = federation_lib_config.clone();
let stats_sender = stats_sender.clone();
let federation_worker_config = federation_worker_config.clone();
workers.insert(
instance.id,
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
let instance = instance.clone();
let config = config.clone();
let stats_sender = stats_sender.clone();
async move { InstanceWorker::init_and_loop(instance, config, stop, stats_sender).await }
InstanceWorker::init_and_loop(
instance.clone(),
config.clone(),
federation_worker_config.clone(),
stop,
stats_sender.clone(),
)
}),
);
} else if !should_federate {
Expand Down Expand Up @@ -117,12 +128,16 @@ pub fn start_stop_federation_workers_cancellable(
opts: Opts,
pool: ActualDbPool,
config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig,
) -> CancellableTask {
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
let opts = opts.clone();
let pool = pool.clone();
let config = config.clone();
async move { start_stop_federation_workers(opts, pool, config, stop).await }
start_stop_federation_workers(
opts.clone(),
pool.clone(),
config.clone(),
federation_worker_config.clone(),
stop,
)
})
}

Expand Down
4 changes: 2 additions & 2 deletions crates/federate/src/send.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::util::{get_activity_cached, get_actor_cached};
use crate::util::get_actor_cached;
use activitypub_federation::{
activity_sending::SendActivityTask,
config::Data,
Expand All @@ -10,7 +10,7 @@ use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT};
use lemmy_db_schema::{newtypes::ActivityId, source::activity::SentActivity};
use reqwest::Url;
use std::{ops::Deref, sync::Arc, time::Duration};
use std::ops::Deref;
use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_util::sync::CancellationToken;

Expand Down
46 changes: 15 additions & 31 deletions crates/federate/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,25 @@
use crate::{
inboxes::CommunityInboxCollector,
send::{SendActivityResult, SendRetryTask, SendSuccessInfo},
util::{
get_activity_cached,
get_actor_cached,
get_latest_activity_id,
WORK_FINISHED_RECHECK_DELAY,
},
};
use activitypub_federation::{
activity_sending::SendActivityTask,
config::{Data, FederationConfig},
protocol::context::WithContext,
util::{get_activity_cached, get_latest_activity_id, WORK_FINISHED_RECHECK_DELAY},
};
use activitypub_federation::config::FederationConfig;
use anyhow::{Context, Result};
use chrono::{DateTime, Days, TimeZone, Utc};
use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT};
use lemmy_api_common::{
context::LemmyContext,
federate_retry_sleep_duration,
lemmy_utils::settings::structs::FederationWorkerConfig,
};
use lemmy_db_schema::{
newtypes::ActivityId,
source::{
activity::SentActivity,
federation_queue_state::FederationQueueState,
instance::{Instance, InstanceForm},
},
utils::{naive_now, ActualDbPool, DbPool},
};
use once_cell::sync::Lazy;
use reqwest::Url;
use std::{
collections::BinaryHeap,
ops::{Add, Deref},
time::Duration,
};
use std::{collections::BinaryHeap, ops::Add, time::Duration};
use tokio::{
sync::mpsc::{self, UnboundedSender},
time::sleep,
Expand All @@ -42,19 +29,14 @@ use tokio_util::sync::CancellationToken;
/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent)
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);

static CONCURRENT_SENDS: Lazy<i64> = Lazy::new(|| {
std::env::var("LEMMY_FEDERATION_CONCURRENT_SENDS_PER_INSTANCE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(8)
});
/// Maximum number of successful sends to allow out of order
const MAX_SUCCESSFULS: usize = 1000;

pub(crate) struct InstanceWorker {
instance: Instance,
stop: CancellationToken,
config: FederationConfig<LemmyContext>,
federation_lib_config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig,
stats_sender: UnboundedSender<(String, FederationQueueState)>,
state: FederationQueueState,
last_state_insert: DateTime<Utc>,
Expand All @@ -66,6 +48,7 @@ impl InstanceWorker {
pub(crate) async fn init_and_loop(
instance: Instance,
config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig,
stop: CancellationToken,
stats_sender: UnboundedSender<(String, FederationQueueState)>,
) -> Result<(), anyhow::Error> {
Expand All @@ -77,9 +60,10 @@ impl InstanceWorker {
instance.id,
instance.domain.clone(),
),
federation_worker_config,
instance,
stop,
config,
federation_lib_config: config,
stats_sender,
state,
last_state_insert: Utc.timestamp_nanos(0),
Expand Down Expand Up @@ -108,7 +92,7 @@ impl InstanceWorker {
// or (b) if we have too many successfuls in memory or (c) if we have too many in flight
let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0)
|| successfuls.len() >= MAX_SUCCESSFULS
|| in_flight >= *CONCURRENT_SENDS;
|| in_flight >= self.federation_worker_config.concurrent_sends_per_instance;
if need_wait_for_event || receive_send_result.len() > 4 {
// if len() > 0 then this does not block and allows us to write to db more often
// if len is 0 then this means we wait for something to change our above conditions,
Expand Down Expand Up @@ -312,7 +296,7 @@ impl InstanceWorker {
return Ok(());
}
let initial_fail_count = self.state.fail_count;
let data = self.config.to_request_data();
let data = self.federation_lib_config.to_request_data();
let stop = self.stop.clone();
let domain = self.instance.domain.clone();
tokio::spawn(async move {
Expand Down
18 changes: 12 additions & 6 deletions crates/utils/src/settings/structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@ pub struct Settings {
#[default(None)]
#[doku(skip)]
pub opentelemetry_url: Option<Url>,
/// The number of activitypub federation workers that can be in-flight concurrently
#[default(0)]
pub worker_count: usize,
/// The number of activitypub federation retry workers that can be in-flight concurrently
#[default(0)]
pub retry_count: usize,
#[default(Default::default())]
pub federation: FederationWorkerConfig,
// Prometheus configuration.
#[default(None)]
#[doku(example = "Some(Default::default())")]
Expand Down Expand Up @@ -234,3 +230,13 @@ pub struct PrometheusConfig {
#[doku(example = "10002")]
pub port: i32,
}

#[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)]
#[serde(default)]
// named federation"worker"config to disambiguate from the activitypub library configuration
pub struct FederationWorkerConfig {
/// Limit to the number of concurrent outgoing federation requests per target instance.
/// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up.
#[default(1)]
pub concurrent_sends_per_instance: i64,
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
},
pool.clone(),
federation_config.clone(),
SETTINGS.federation.clone(),
)
});
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;
Expand Down

0 comments on commit 5e986ef

Please sign in to comment.