Skip to content

Commit

Permalink
Avoid race condition when streaming deleted statuses (mastodon#10280)
Browse files Browse the repository at this point in the history
* Avoid race condition when streaming deleted statuses

* Move redis lock to DistributionWorker to avoid extra Redis value
  • Loading branch information
ClearlyClaire authored and Gargron committed Mar 16, 2019
1 parent ec85e83 commit 72fb854
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
30 changes: 20 additions & 10 deletions app/services/remove_status_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@ def call(status, **options)
@stream_entry = status.stream_entry
@options = options

remove_from_self if status.account.local?
remove_from_followers
remove_from_lists
remove_from_affected
remove_reblogs
remove_from_hashtags
remove_from_public
remove_from_media if status.media_attachments.any?

@status.destroy!
RedisLock.acquire(lock_options) do |lock|
if lock.acquired?
remove_from_self if status.account.local?
remove_from_followers
remove_from_lists
remove_from_affected
remove_reblogs
remove_from_hashtags
remove_from_public
remove_from_media if status.media_attachments.any?

@status.destroy!
else
raise Mastodon::RaceConditionError
end
end

# There is no reason to send out Undo activities when the
# cause is that the original object has been removed, since
Expand Down Expand Up @@ -156,4 +162,8 @@ def remove_from_media
redis.publish('timeline:public:media', @payload)
redis.publish('timeline:public:local:media', @payload) if @status.local?
end

def lock_options
{ redis: Redis.current, key: "distribute:#{@status.id}" }
end
end
8 changes: 7 additions & 1 deletion app/workers/distribution_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ class DistributionWorker
include Sidekiq::Worker

def perform(status_id)
FanOutOnWriteService.new.call(Status.find(status_id))
RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}") do |lock|
if lock.acquired?
FanOutOnWriteService.new.call(Status.find(status_id))
else
raise Mastodon::RaceConditionError
end
end
rescue ActiveRecord::RecordNotFound
true
end
Expand Down

0 comments on commit 72fb854

Please sign in to comment.