Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add federation queue state to get_federated_instances api #4104

Merged
merged 6 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions crates/api_common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ doctest = false
full = [
"tracing",
"rosetta-i18n",
"chrono",
"lemmy_utils",
"lemmy_db_views/full",
"lemmy_db_views_actor/full",
Expand Down Expand Up @@ -47,7 +46,7 @@ activitypub_federation = { workspace = true, optional = true }
serde = { workspace = true }
serde_with = { workspace = true }
url = { workspace = true }
chrono = { workspace = true, optional = true }
chrono = { workspace = true }
tracing = { workspace = true, optional = true }
reqwest-middleware = { workspace = true, optional = true }
regex = { workspace = true }
Expand Down
6 changes: 6 additions & 0 deletions crates/api_common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub extern crate lemmy_db_views_actor;
pub extern crate lemmy_db_views_moderator;

use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(ts_rs::TS))]
Expand All @@ -39,3 +40,8 @@ impl Default for SuccessResponse {
SuccessResponse { success: true }
}
}

/// how long to sleep based on how many retries have already happened
pub fn federate_retry_sleep_duration(retry_count: i32) -> Duration {
Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count)))
}
2 changes: 1 addition & 1 deletion crates/api_common/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ pub fn client_builder(settings: &Settings) -> ClientBuilder {
);

Client::builder()
.user_agent(user_agent.clone())
.user_agent(user_agent)
.timeout(REQWEST_TIMEOUT)
.connect_timeout(REQWEST_TIMEOUT)
}
Expand Down
47 changes: 43 additions & 4 deletions crates/api_common/src/site.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use crate::federate_retry_sleep_duration;
use chrono::{DateTime, Utc};
use lemmy_db_schema::{
newtypes::{CommentId, CommunityId, InstanceId, LanguageId, PersonId, PostId},
source::{instance::Instance, language::Language, tagline::Tagline},
source::{
federation_queue_state::FederationQueueState,
instance::Instance,
language::Language,
tagline::Tagline,
},
ListingType,
ModlogActionType,
RegistrationMode,
Expand Down Expand Up @@ -316,9 +323,41 @@ pub struct MyUserInfo {
#[cfg_attr(feature = "full", ts(export))]
/// A list of federated instances.
pub struct FederatedInstances {
pub linked: Vec<Instance>,
pub allowed: Vec<Instance>,
pub blocked: Vec<Instance>,
pub linked: Vec<InstanceWithFederationState>,
pub allowed: Vec<InstanceWithFederationState>,
pub blocked: Vec<InstanceWithFederationState>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", ts(export))]
pub struct ReadableFederationState {
#[serde(flatten)]
internal_state: FederationQueueState,
/// timestamp of the next retry attempt (null if fail count is 0)
next_retry: Option<DateTime<Utc>>,
}

impl From<FederationQueueState> for ReadableFederationState {
fn from(internal_state: FederationQueueState) -> Self {
ReadableFederationState {
next_retry: internal_state.last_retry.map(|r| {
r + chrono::Duration::from_std(federate_retry_sleep_duration(internal_state.fail_count))
.expect("sleep duration longer than 2**63 ms (262 million years)")
}),
internal_state,
}
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", ts(export))]
pub struct InstanceWithFederationState {
#[serde(flatten)]
pub instance: Instance,
/// if federation to this instance is or was active, show state of outgoing federation to this instance
pub federation_state: Option<ReadableFederationState>,
}

#[skip_serializing_none]
Expand Down
29 changes: 22 additions & 7 deletions crates/api_common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
context::LemmyContext,
request::purge_image_from_pictrs,
sensitive::Sensitive,
site::FederatedInstances,
site::{FederatedInstances, InstanceWithFederationState},
};
use actix_web::cookie::{Cookie, SameSite};
use anyhow::Context;
Expand Down Expand Up @@ -275,12 +275,27 @@ pub async fn build_federated_instances(
pool: &mut DbPool<'_>,
) -> Result<Option<FederatedInstances>, LemmyError> {
if local_site.federation_enabled {
// TODO I hate that this requires 3 queries
let (linked, allowed, blocked) = lemmy_db_schema::try_join_with_pool!(pool => (
Instance::linked,
Instance::allowlist,
Instance::blocklist
))?;
let mut linked = Vec::new();
let mut allowed = Vec::new();
let mut blocked = Vec::new();

let all = Instance::read_all_with_fed_state(pool).await?;
for (instance, federation_state, is_blocked, is_allowed) in all {
let i = InstanceWithFederationState {
instance,
federation_state: federation_state.map(std::convert::Into::into),
};
if is_blocked {
// blocked instances will only have an entry here if they had been federated with in the past.
blocked.push(i);
} else if is_allowed {
allowed.push(i.clone());
linked.push(i);
} else {
// not explicitly allowed but implicitly linked
linked.push(i);
}
}

Ok(Some(FederatedInstances {
linked,
Expand Down
4 changes: 2 additions & 2 deletions crates/db_schema/src/impls/activity.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
diesel::OptionalExtension,
newtypes::DbUrl,
newtypes::{ActivityId, DbUrl},
source::activity::{ReceivedActivity, SentActivity, SentActivityForm},
utils::{get_conn, DbPool},
};
Expand Down Expand Up @@ -30,7 +30,7 @@ impl SentActivity {
.first::<Self>(conn)
.await
}
pub async fn read(pool: &mut DbPool<'_>, object_id: i64) -> Result<Self, Error> {
pub async fn read(pool: &mut DbPool<'_>, object_id: ActivityId) -> Result<Self, Error> {
use crate::schema::sent_activity::dsl::sent_activity;
let conn = &mut get_conn(pool).await?;
sent_activity.find(object_id).first::<Self>(conn).await
Expand Down
46 changes: 46 additions & 0 deletions crates/db_schema/src/impls/federation_queue_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::{
newtypes::InstanceId,
source::federation_queue_state::FederationQueueState,
utils::{get_conn, DbPool},
};
use diesel::{prelude::*, result::Error};
use diesel_async::RunQueryDsl;

impl FederationQueueState {
/// load state or return a default empty value
pub async fn load(
pool: &mut DbPool<'_>,
instance_id_: InstanceId,
) -> Result<FederationQueueState, Error> {
use crate::schema::federation_queue_state::dsl::{federation_queue_state, instance_id};
let conn = &mut get_conn(pool).await?;
Ok(
federation_queue_state
.filter(instance_id.eq(&instance_id_))
.select(FederationQueueState::as_select())
.get_result(conn)
.await
.optional()?
.unwrap_or(FederationQueueState {
instance_id: instance_id_,
fail_count: 0,
last_retry: None,
last_successful_id: None, // this value is set to the most current id for new instances
last_successful_published_time: None,
}),
)
}
pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<(), Error> {
use crate::schema::federation_queue_state::dsl::{federation_queue_state, instance_id};
let conn = &mut get_conn(pool).await?;

state
.insert_into(federation_queue_state)
.on_conflict(instance_id)
.do_update()
.set(state)
.execute(conn)
.await?;
Ok(())
}
}
38 changes: 28 additions & 10 deletions crates/db_schema/src/impls/instance.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
use crate::{
diesel::dsl::IntervalDsl,
newtypes::InstanceId,
schema::{federation_allowlist, federation_blocklist, instance, local_site, site},
source::instance::{Instance, InstanceForm},
schema::{
federation_allowlist,
federation_blocklist,
federation_queue_state,
instance,
local_site,
site,
},
source::{
federation_queue_state::FederationQueueState,
instance::{Instance, InstanceForm},
},
utils::{functions::lower, get_conn, naive_now, now, DbPool},
};
use diesel::{
Expand Down Expand Up @@ -59,7 +69,7 @@ impl Instance {
pub async fn read_all(pool: &mut DbPool<'_>) -> Result<Vec<Instance>, Error> {
let conn = &mut get_conn(pool).await?;
instance::table
.select(instance::all_columns)
.select(Self::as_select())
.get_results(conn)
.await
}
Expand All @@ -73,7 +83,7 @@ impl Instance {
let conn = &mut get_conn(pool).await?;
instance::table
.inner_join(federation_allowlist::table)
.select(instance::all_columns)
.select(Self::as_select())
.get_results(conn)
.await
}
Expand All @@ -82,14 +92,14 @@ impl Instance {
let conn = &mut get_conn(pool).await?;
instance::table
.inner_join(federation_blocklist::table)
.select(instance::all_columns)
.select(Self::as_select())
.get_results(conn)
.await
}

/// returns a list of all instances, each with a flag of whether the instance is allowed or not and dead or not
/// ordered by id
pub async fn read_all_with_blocked_and_dead(
pub async fn read_federated_with_blocked_and_dead(
pool: &mut DbPool<'_>,
) -> Result<Vec<(Self, bool, bool)>, Error> {
let conn = &mut get_conn(pool).await?;
Expand Down Expand Up @@ -125,16 +135,24 @@ impl Instance {
}
}

pub async fn linked(pool: &mut DbPool<'_>) -> Result<Vec<Self>, Error> {
/// returns (instance, blocked, allowed, fed queue state) tuples
pub async fn read_all_with_fed_state(
pool: &mut DbPool<'_>,
) -> Result<Vec<(Self, Option<FederationQueueState>, bool, bool)>, Error> {
let conn = &mut get_conn(pool).await?;
instance::table
// omit instance representing the local site
.left_join(site::table.inner_join(local_site::table))
.filter(local_site::id.is_null())
// omit instances in the blocklist
.left_join(federation_blocklist::table)
.filter(federation_blocklist::id.is_null())
.select(instance::all_columns)
.left_join(federation_allowlist::table)
.left_join(federation_queue_state::table)
.select((
Self::as_select(),
Option::<FederationQueueState>::as_select(),
federation_blocklist::id.nullable().is_not_null(),
federation_allowlist::id.nullable().is_not_null(),
))
.get_results(conn)
.await
}
Expand Down
1 change: 1 addition & 0 deletions crates/db_schema/src/impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod custom_emoji;
pub mod email_verification;
pub mod federation_allowlist;
pub mod federation_blocklist;
pub mod federation_queue_state;
pub mod image_upload;
pub mod instance;
pub mod instance_block;
Expand Down
7 changes: 7 additions & 0 deletions crates/db_schema/src/newtypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ pub struct ImageUploadId(i32);
/// The instance id.
pub struct InstanceId(i32);

#[derive(
Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default, PartialOrd, Ord,
)]
#[cfg_attr(feature = "full", derive(DieselNewType, TS))]
#[cfg_attr(feature = "full", ts(export))]
pub struct ActivityId(pub i64);

#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default)]
#[cfg_attr(feature = "full", derive(DieselNewType, TS))]
#[cfg_attr(feature = "full", ts(export))]
Expand Down
5 changes: 3 additions & 2 deletions crates/db_schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,10 @@ diesel::table! {
federation_queue_state (id) {
id -> Int4,
instance_id -> Int4,
last_successful_id -> Int8,
last_successful_id -> Nullable<Int8>,
fail_count -> Int4,
last_retry -> Timestamptz,
last_retry -> Nullable<Timestamptz>,
last_successful_published_time -> Nullable<Timestamptz>,
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/db_schema/src/source/activity.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
newtypes::{CommunityId, DbUrl},
newtypes::{ActivityId, CommunityId, DbUrl},
schema::sent_activity,
};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -54,7 +54,7 @@ impl ActivitySendTargets {
#[derive(PartialEq, Eq, Debug, Queryable)]
#[diesel(table_name = sent_activity)]
pub struct SentActivity {
pub id: i64,
pub id: ActivityId,
pub ap_id: DbUrl,
pub data: Value,
pub sensitive: bool,
Expand Down
26 changes: 26 additions & 0 deletions crates/db_schema/src/source/federation_queue_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::newtypes::{ActivityId, InstanceId};
use chrono::{DateTime, Utc};
#[cfg(feature = "full")]
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
#[cfg(feature = "full")]
use ts_rs::TS;

#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(
feature = "full",
derive(Queryable, Selectable, Insertable, AsChangeset)
)]
#[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", diesel(table_name = crate::schema::federation_queue_state))]
#[cfg_attr(feature = "full", diesel(check_for_backend(diesel::pg::Pg)))]
pub struct FederationQueueState {
pub instance_id: InstanceId,
/// the last successfully sent activity id
pub last_successful_id: Option<ActivityId>,
pub last_successful_published_time: Option<DateTime<Utc>>,
/// how many failed attempts have been made to send the next activity
pub fail_count: i32,
/// timestamp of the last retry attempt (when the last failing activity was resent)
pub last_retry: Option<DateTime<Utc>>,
}
1 change: 1 addition & 0 deletions crates/db_schema/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod custom_emoji_keyword;
pub mod email_verification;
pub mod federation_allowlist;
pub mod federation_blocklist;
pub mod federation_queue_state;
pub mod image_upload;
pub mod instance;
pub mod instance_block;
Expand Down
Loading