Skip to content

Commit

Permalink
federation: parallel sending per instance (#4623)
Browse files Browse the repository at this point in the history
* federation: parallel sending

* federation: some comments

* lint and set force_write true when a request fails

* inbox_urls return vec

* split inbox functions into separate file

* cleanup

* extract sending task code to separate file

* move federation concurrent config to config file

* off by one issue

* improve msg

* fix both permanent stopping of federation queues and multiple creation of the same federation queues

* fix after merge

* lint fix

* Update crates/federate/src/send.rs

Co-authored-by: dullbananas <dull.bananas0@gmail.com>

* comment about reverse ordering

* remove crashable, comment

* comment

* move comment

* run federation tests twice

* fix test run

* prettier

* fix config default

* upgrade rust to 1.78 to fix diesel cli

* fix clippy

* delay

* add debug to make localhost urls not valid in ap crate, add some debug logs

* federation tests: ensure server stop after test and random activity id

* ci fix

* add test to federate 100 events

* fix send 100 test

* different data every time so activities are distinguishable

* allow out of order receives in test

* lint

* comment about #4623 (comment)

* move sender for clarity, add comment

* move more things to members

* update test todo comment, use same env var as worker test but default to 1

* remove else below continue

* some more cleanup

* handle todo about smooth exit

* add federate inboxes collector tests

* lint

* actor max length

* don't reset fail count if activity skipped

* fix some comments

* reuse vars

* format

* Update .woodpecker.yml

* fix recheck time

* fix inboxes tests under fast mode

* format

* make i32 and ugly casts

* clippy

---------

Co-authored-by: dullbananas <dull.bananas0@gmail.com>
  • Loading branch information
2 people authored and Nutomic committed Jul 23, 2024
1 parent 5eaaabe commit 9dc4de3
Show file tree
Hide file tree
Showing 17 changed files with 1,677 additions and 321 deletions.
1 change: 1 addition & 0 deletions .woodpecker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ steps:
LEMMY_DATABASE_URL: postgres://lemmy:password@database:5432/lemmy
RUST_BACKTRACE: "1"
CARGO_HOME: .cargo_home
LEMMY_TEST_FAST_FEDERATION: "1"
commands:
- export LEMMY_CONFIG_LOCATION=../../config/config.hjson
- cargo test --workspace --no-fail-fast
Expand Down
120 changes: 120 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 46 additions & 21 deletions api_tests/src/follow.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
betaUrl,
registerUser,
unfollows,
delay,
} from "./shared";

beforeAll(setupLogins);
Expand All @@ -21,39 +22,48 @@ test("Follow local community", async () => {
let user = await registerUser(beta, betaUrl);

let community = (await resolveBetaCommunity(user)).community!;
expect(community.counts.subscribers).toBe(1);
expect(community.counts.subscribers_local).toBe(1);
let follow = await followCommunity(user, true, community.community.id);

// Make sure the follow response went through
expect(follow.community_view.community.local).toBe(true);
expect(follow.community_view.subscribed).toBe("Subscribed");
expect(follow.community_view.counts.subscribers).toBe(2);
expect(follow.community_view.counts.subscribers_local).toBe(2);
expect(follow.community_view.counts.subscribers).toBe(
community.counts.subscribers + 1,
);
expect(follow.community_view.counts.subscribers_local).toBe(
community.counts.subscribers_local + 1,
);

// Test an unfollow
let unfollow = await followCommunity(user, false, community.community.id);
expect(unfollow.community_view.subscribed).toBe("NotSubscribed");
expect(unfollow.community_view.counts.subscribers).toBe(1);
expect(unfollow.community_view.counts.subscribers_local).toBe(1);
expect(unfollow.community_view.counts.subscribers).toBe(
community.counts.subscribers,
);
expect(unfollow.community_view.counts.subscribers_local).toBe(
community.counts.subscribers_local,
);
});

test("Follow federated community", async () => {
// It takes about 1 second for the community aggregates to federate
let betaCommunity = (
await delay(2000); // if this is the second test run, we don't have a way to wait for the correct number of subscribers
const betaCommunityInitial = (
await waitUntil(
() => resolveBetaCommunity(alpha),
c =>
c.community?.counts.subscribers === 1 &&
c.community.counts.subscribers_local === 0,
c => !!c.community && c.community?.counts.subscribers >= 1,
)
).community;
if (!betaCommunity) {
if (!betaCommunityInitial) {
throw "Missing beta community";
}
let follow = await followCommunity(alpha, true, betaCommunity.community.id);
let follow = await followCommunity(
alpha,
true,
betaCommunityInitial.community.id,
);
expect(follow.community_view.subscribed).toBe("Pending");
betaCommunity = (
const betaCommunity = (
await waitUntil(
() => resolveBetaCommunity(alpha),
c => c.community?.subscribed === "Subscribed",
Expand All @@ -64,20 +74,24 @@ test("Follow federated community", async () => {
expect(betaCommunity?.community.local).toBe(false);
expect(betaCommunity?.community.name).toBe("main");
expect(betaCommunity?.subscribed).toBe("Subscribed");
expect(betaCommunity?.counts.subscribers_local).toBe(1);
expect(betaCommunity?.counts.subscribers_local).toBe(
betaCommunityInitial.counts.subscribers_local + 1,
);

// check that unfollow was federated
let communityOnBeta1 = await resolveBetaCommunity(beta);
expect(communityOnBeta1.community?.counts.subscribers).toBe(2);
expect(communityOnBeta1.community?.counts.subscribers_local).toBe(1);
expect(communityOnBeta1.community?.counts.subscribers).toBe(
betaCommunityInitial.counts.subscribers + 1,
);

// Check it from local
let site = await getSite(alpha);
let remoteCommunityId = site.my_user?.follows.find(
c => c.community.local == false,
c =>
c.community.local == false &&
c.community.id === betaCommunityInitial.community.id,
)?.community.id;
expect(remoteCommunityId).toBeDefined();
expect(site.my_user?.follows.length).toBe(2);

if (!remoteCommunityId) {
throw "Missing remote community id";
Expand All @@ -89,10 +103,21 @@ test("Follow federated community", async () => {

// Make sure you are unsubbed locally
let siteUnfollowCheck = await getSite(alpha);
expect(siteUnfollowCheck.my_user?.follows.length).toBe(1);
expect(
siteUnfollowCheck.my_user?.follows.find(
c => c.community.id === betaCommunityInitial.community.id,
),
).toBe(undefined);

// check that unfollow was federated
let communityOnBeta2 = await resolveBetaCommunity(beta);
expect(communityOnBeta2.community?.counts.subscribers).toBe(1);
let communityOnBeta2 = await waitUntil(
() => resolveBetaCommunity(beta),
c =>
c.community?.counts.subscribers ===
betaCommunityInitial.counts.subscribers,
);
expect(communityOnBeta2.community?.counts.subscribers).toBe(
betaCommunityInitial.counts.subscribers,
);
expect(communityOnBeta2.community?.counts.subscribers_local).toBe(1);
});
30 changes: 20 additions & 10 deletions api_tests/src/post.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,23 @@ beforeAll(async () => {

afterAll(unfollows);

async function assertPostFederation(postOne: PostView, postTwo: PostView) {
async function assertPostFederation(
postOne: PostView,
postTwo: PostView,
waitForMeta = true,
) {
// Link metadata is generated in background task and may not be ready yet at this time,
// so wait for it explicitly. For removed posts we cant refetch anything.
postOne = await waitForPost(beta, postOne.post, res => {
return res === null || res?.post.embed_title !== null;
});
postTwo = await waitForPost(
beta,
postTwo.post,
res => res === null || res?.post.embed_title !== null,
);
if (waitForMeta) {
postOne = await waitForPost(beta, postOne.post, res => {
return res === null || !!res?.post.embed_title;
});
postTwo = await waitForPost(
beta,
postTwo.post,
res => res === null || !!res?.post.embed_title,
);
}

expect(postOne?.post.ap_id).toBe(postTwo?.post.ap_id);
expect(postOne?.post.name).toBe(postTwo?.post.name);
Expand Down Expand Up @@ -408,7 +414,11 @@ test("Remove a post from admin and community on same instance", async () => {
p => p?.post_view.post.removed ?? false,
);
expect(alphaPost?.post_view.post.removed).toBe(true);
await assertPostFederation(alphaPost.post_view, removePostRes.post_view);
await assertPostFederation(
alphaPost.post_view,
removePostRes.post_view,
false,
);

// Undelete
let undeletedPost = await removePost(beta, false, betaPost.post);
Expand Down
6 changes: 5 additions & 1 deletion api_tests/src/user.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ test("Requests with invalid auth should be treated as unauthenticated", async ()
});

test("Create user with Arabic name", async () => {
let user = await registerUser(alpha, alphaUrl, "تجريب");
let user = await registerUser(
alpha,
alphaUrl,
"تجريب" + Math.random().toString().slice(2, 10), // less than actor_name_max_length
);

let site = await getSite(user);
expect(site.my_user).toBeDefined();
Expand Down
10 changes: 6 additions & 4 deletions config/defaults.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@
port: 8536
# Whether the site is available over TLS. Needs to be true for federation to work.
tls_enabled: true
# The number of activitypub federation workers that can be in-flight concurrently
worker_count: 0
# The number of activitypub federation retry workers that can be in-flight concurrently
retry_count: 0
federation: {
# Limit to the number of concurrent outgoing federation requests per target instance.
# Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities
# per second) and if a receiving instance is not keeping up.
concurrent_sends_per_instance: 1
}
prometheus: {
bind: "127.0.0.1"
port: 10002
Expand Down
Loading

0 comments on commit 9dc4de3

Please sign in to comment.