forked from mastodon/mastodon
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Improved remote thread fetching (mastodon#10106)
* Fetch up to 5 replies when discovering a new remote status This is used for resolving threads downwards. The originating server must add a “replies” attributes with such replies for it to be useful. * Add some tests for ActivityPub::FetchRepliesWorker * Add specs for ActivityPub::FetchRepliesService * Serialize up to 5 public self-replies for ActivityPub notes * Add specs for ActivityPub::NoteSerializer * Move exponential backoff logic to a worker concern * Fetch first page of paginated collections when fetching thread replies * Add specs for paginated collections in replies * Move Note replies serialization to a first CollectionPage The collection isn't actually paginable yet as it has no id nor a `next` field. This may come in another PR. * Use pluck(:uri) instead of map(&:uri) to improve performances * Fix fetching replies when they are in a CollectionPage
- Loading branch information
1 parent
6e8743d
commit 9d3c6f1
Showing
13 changed files
with
333 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
# frozen_string_literal: true | ||
|
||
class ActivityPub::CollectionPresenter < ActiveModelSerializers::Model | ||
attributes :id, :type, :size, :items, :part_of, :first, :last, :next, :prev | ||
attributes :id, :type, :size, :items, :page, :part_of, :first, :last, :next, :prev | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
# frozen_string_literal: true | ||
|
||
class ActivityPub::FetchRepliesService < BaseService | ||
include JsonLdHelper | ||
|
||
def call(parent_status, collection_or_uri, allow_synchronous_requests = true) | ||
@account = parent_status.account | ||
@allow_synchronous_requests = allow_synchronous_requests | ||
|
||
@items = collection_items(collection_or_uri) | ||
return if @items.nil? | ||
|
||
FetchReplyWorker.push_bulk(filtered_replies) | ||
|
||
@items | ||
end | ||
|
||
private | ||
|
||
def collection_items(collection_or_uri) | ||
collection = fetch_collection(collection_or_uri) | ||
return unless collection.is_a?(Hash) | ||
|
||
collection = fetch_collection(collection['first']) if collection['first'].present? | ||
return unless collection.is_a?(Hash) | ||
|
||
case collection['type'] | ||
when 'Collection', 'CollectionPage' | ||
collection['items'] | ||
when 'OrderedCollection', 'OrderedCollectionPage' | ||
collection['orderedItems'] | ||
end | ||
end | ||
|
||
def fetch_collection(collection_or_uri) | ||
return collection_or_uri if collection_or_uri.is_a?(Hash) | ||
return unless @allow_synchronous_requests | ||
return if invalid_origin?(collection_or_uri) | ||
collection = fetch_resource_without_id_validation(collection_or_uri) | ||
raise Mastodon::UnexpectedResponseError if collection.nil? | ||
collection | ||
end | ||
|
||
def filtered_replies | ||
# Only fetch replies to the same server as the original status to avoid | ||
# amplification attacks. | ||
|
||
# Also limit to 5 fetched replies to limit potential for DoS. | ||
@items.map { |item| value_or_id(item) }.reject { |uri| invalid_origin?(uri) }.take(5) | ||
end | ||
|
||
def invalid_origin?(url) | ||
return true if unsupported_uri_scheme?(url) | ||
|
||
needle = Addressable::URI.parse(url).host | ||
haystack = Addressable::URI.parse(@account.uri).host | ||
|
||
!haystack.casecmp(needle).zero? | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# frozen_string_literal: true | ||
|
||
class ActivityPub::FetchRepliesWorker | ||
include Sidekiq::Worker | ||
include ExponentialBackoff | ||
|
||
sidekiq_options queue: 'pull', retry: 3 | ||
|
||
def perform(parent_status_id, replies_uri) | ||
ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# frozen_string_literal: true | ||
|
||
module ExponentialBackoff | ||
extend ActiveSupport::Concern | ||
|
||
included do | ||
sidekiq_retry_in do |count| | ||
15 + 10 * (count**4) + rand(10 * (count**4)) | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# frozen_string_literal: true | ||
|
||
class FetchReplyWorker | ||
include Sidekiq::Worker | ||
include ExponentialBackoff | ||
|
||
sidekiq_options queue: 'pull', retry: 3 | ||
|
||
def perform(child_url) | ||
FetchRemoteStatusService.new.call(child_url) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'rails_helper' | ||
|
||
describe ActivityPub::NoteSerializer do | ||
let!(:account) { Fabricate(:account) } | ||
let!(:other) { Fabricate(:account) } | ||
let!(:parent) { Fabricate(:status, account: account, visibility: :public) } | ||
let!(:reply1) { Fabricate(:status, account: account, thread: parent, visibility: :public) } | ||
let!(:reply2) { Fabricate(:status, account: account, thread: parent, visibility: :public) } | ||
let!(:reply3) { Fabricate(:status, account: other, thread: parent, visibility: :public) } | ||
let!(:reply4) { Fabricate(:status, account: account, thread: parent, visibility: :public) } | ||
let!(:reply5) { Fabricate(:status, account: account, thread: parent, visibility: :direct) } | ||
|
||
before(:each) do | ||
@serialization = ActiveModelSerializers::SerializableResource.new(parent, serializer: ActivityPub::NoteSerializer, adapter: ActivityPub::Adapter) | ||
end | ||
|
||
subject { JSON.parse(@serialization.to_json) } | ||
|
||
it 'has a Note type' do | ||
expect(subject['type']).to eql('Note') | ||
end | ||
|
||
it 'has a replies collection' do | ||
expect(subject['replies']['type']).to eql('Collection') | ||
end | ||
|
||
it 'has a replies collection with a first Page' do | ||
expect(subject['replies']['first']['type']).to eql('CollectionPage') | ||
end | ||
|
||
it 'includes public self-replies in its replies collection' do | ||
expect(subject['replies']['first']['items']).to include(reply1.uri, reply2.uri, reply4.uri) | ||
end | ||
|
||
it 'does not include replies from others in its replies collection' do | ||
expect(subject['replies']['first']['items']).to_not include(reply3.uri) | ||
end | ||
|
||
it 'does not include replies with direct visibility in its replies collection' do | ||
expect(subject['replies']['first']['items']).to_not include(reply5.uri) | ||
end | ||
end |
122 changes: 122 additions & 0 deletions
122
spec/services/activitypub/fetch_replies_service_spec.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
require 'rails_helper' | ||
|
||
RSpec.describe ActivityPub::FetchRepliesService, type: :service do | ||
let(:actor) { Fabricate(:account, domain: 'example.com', uri: 'http://example.com/account') } | ||
let(:status) { Fabricate(:status, account: actor) } | ||
let(:collection_uri) { 'http://example.com/replies/1' } | ||
|
||
let(:items) do | ||
[ | ||
'http://example.com/self-reply-1', | ||
'http://example.com/self-reply-2', | ||
'http://example.com/self-reply-3', | ||
'http://other.com/other-reply-1', | ||
'http://other.com/other-reply-2', | ||
'http://other.com/other-reply-3', | ||
'http://example.com/self-reply-4', | ||
'http://example.com/self-reply-5', | ||
'http://example.com/self-reply-6', | ||
] | ||
end | ||
|
||
let(:payload) do | ||
{ | ||
'@context': 'https://www.w3.org/ns/activitystreams', | ||
type: 'Collection', | ||
id: collection_uri, | ||
items: items, | ||
}.with_indifferent_access | ||
end | ||
|
||
subject { described_class.new } | ||
|
||
describe '#call' do | ||
context 'when the payload is a Collection with inlined replies' do | ||
context 'when passing the collection itself' do | ||
it 'spawns workers for up to 5 replies on the same server' do | ||
allow(FetchReplyWorker).to receive(:push_bulk) | ||
subject.call(status, payload) | ||
expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) | ||
end | ||
end | ||
|
||
context 'when passing the URL to the collection' do | ||
before do | ||
stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload)) | ||
end | ||
|
||
it 'spawns workers for up to 5 replies on the same server' do | ||
allow(FetchReplyWorker).to receive(:push_bulk) | ||
subject.call(status, collection_uri) | ||
expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) | ||
end | ||
end | ||
end | ||
|
||
context 'when the payload is an OrderedCollection with inlined replies' do | ||
let(:payload) do | ||
{ | ||
'@context': 'https://www.w3.org/ns/activitystreams', | ||
type: 'OrderedCollection', | ||
id: collection_uri, | ||
orderedItems: items, | ||
}.with_indifferent_access | ||
end | ||
|
||
context 'when passing the collection itself' do | ||
it 'spawns workers for up to 5 replies on the same server' do | ||
allow(FetchReplyWorker).to receive(:push_bulk) | ||
subject.call(status, payload) | ||
expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) | ||
end | ||
end | ||
|
||
context 'when passing the URL to the collection' do | ||
before do | ||
stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload)) | ||
end | ||
|
||
it 'spawns workers for up to 5 replies on the same server' do | ||
allow(FetchReplyWorker).to receive(:push_bulk) | ||
subject.call(status, collection_uri) | ||
expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) | ||
end | ||
end | ||
end | ||
|
||
context 'when the payload is a paginated Collection with inlined replies' do | ||
let(:payload) do | ||
{ | ||
'@context': 'https://www.w3.org/ns/activitystreams', | ||
type: 'Collection', | ||
id: collection_uri, | ||
first: { | ||
type: 'CollectionPage', | ||
partOf: collection_uri, | ||
items: items, | ||
} | ||
}.with_indifferent_access | ||
end | ||
|
||
context 'when passing the collection itself' do | ||
it 'spawns workers for up to 5 replies on the same server' do | ||
allow(FetchReplyWorker).to receive(:push_bulk) | ||
subject.call(status, payload) | ||
expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) | ||
end | ||
end | ||
|
||
context 'when passing the URL to the collection' do | ||
before do | ||
stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload)) | ||
end | ||
|
||
it 'spawns workers for up to 5 replies on the same server' do | ||
allow(FetchReplyWorker).to receive(:push_bulk) | ||
subject.call(status, collection_uri) | ||
expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) | ||
end | ||
end | ||
end | ||
end | ||
end |
Oops, something went wrong.