Skip to content

Commit

Permalink
Simplify task queues locking mechanism (#16477)
Browse files Browse the repository at this point in the history
We now have one mutex guarding all accesses to
the underlying task heaps. This simplifies the more granular
but bug prone mechanism of having striped locks.

This also re-enables GPUThreadMerger tests that are currently
disabled due to their flaky nature. The scenario that gets fixed by this
change is as follows:

1. Thread-1: We lock `queue_meta_mutex_` and grab locks on `queue_1` and release the meta mutex.
2. Thread-1: We add an Observer on `queues` object.
3. Thread-2: We lock `queue_meta_mutex_` and grab locks on `queue_2`.
4. Thread-2: We try to dispose all the pending tasks on `queue_2` which calls `erase` on `queues`.

The above situation is not thread safe without having 1 lock.

Note: This increases the contention on one lock and could potentially be bad for perf. We are
explicitly making this trade-off towards reducing the complexity.

Fixes: flutter/flutter#49007
  • Loading branch information
iskakaushik authored Feb 7, 2020
1 parent c932214 commit 5c70356
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 64 deletions.
9 changes: 3 additions & 6 deletions fml/gpu_thread_merger_unittests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
#include "flutter/fml/task_runner.h"
#include "gtest/gtest.h"

// TODO(49007): Flaky. Investigate, fix and re-enable.
TEST(GpuThreadMerger, DISABLED_RemainMergedTillLeaseExpires) {
TEST(GpuThreadMerger, RemainMergedTillLeaseExpires) {
fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent latch1;
fml::AutoResetWaitableEvent term1;
Expand Down Expand Up @@ -62,8 +61,7 @@ TEST(GpuThreadMerger, DISABLED_RemainMergedTillLeaseExpires) {
thread2.join();
}

// TODO(49007): Flaky. Investigate, fix and re-enable.
TEST(GpuThreadMerger, DISABLED_IsNotOnRasterizingThread) {
TEST(GpuThreadMerger, IsNotOnRasterizingThread) {
fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent latch1;
std::thread thread1([&loop1, &latch1]() {
Expand Down Expand Up @@ -148,8 +146,7 @@ TEST(GpuThreadMerger, DISABLED_IsNotOnRasterizingThread) {
thread2.join();
}

// TODO(49007): Flaky. Investigate, fix and re-enable.
TEST(GpuThreadMerger, DISABLED_LeaseExtension) {
TEST(GpuThreadMerger, LeaseExtension) {
fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent latch1;
fml::AutoResetWaitableEvent term1;
Expand Down
76 changes: 24 additions & 52 deletions fml/message_loop_task_queues.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "flutter/fml/make_copyable.h"
#include "flutter/fml/message_loop_impl.h"

#include <iostream>

namespace fml {

std::mutex MessageLoopTaskQueues::creation_mutex_;
Expand All @@ -32,55 +34,46 @@ fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {
}

TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
fml::UniqueLock lock(*queue_meta_mutex_);
std::lock_guard guard(queue_mutex_);
TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
++task_queue_id_counter_;

queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>();
queue_locks_[loop_id] = std::make_unique<std::mutex>();

return loop_id;
}

MessageLoopTaskQueues::MessageLoopTaskQueues()
: queue_meta_mutex_(fml::SharedMutex::Create()),
task_queue_id_counter_(0),
order_(0) {}
: task_queue_id_counter_(0), order_(0) {}

MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;

void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
queue_entries_.erase(queue_id);
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
queue_entries_.erase(subsumed);
}
}

void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
queue_entry->delayed_tasks = {};
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
queue_entries_.at(subsumed)->delayed_tasks = {};
}
}

void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
const fml::closure& task,
fml::TimePoint target_time) {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
size_t order = order_++;
const auto& queue_entry = queue_entries_[queue_id];
const auto& queue_entry = queue_entries_.at(queue_id);
queue_entry->delayed_tasks.push({order, task, target_time});
TaskQueueId loop_to_wake = queue_id;
if (queue_entry->subsumed_by != _kUnmerged) {
Expand All @@ -91,17 +84,15 @@ void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
}

bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
return HasPendingTasksUnlocked(queue_id);
}

void MessageLoopTaskQueues::GetTasksToRunNow(
TaskQueueId queue_id,
FlushType type,
std::vector<fml::closure>& invocations) {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
if (!HasPendingTasksUnlocked(queue_id)) {
return;
}
Expand All @@ -115,7 +106,7 @@ void MessageLoopTaskQueues::GetTasksToRunNow(
break;
}
invocations.emplace_back(std::move(top.GetTask()));
queue_entries_[top_queue]->delayed_tasks.pop();
queue_entries_.at(top_queue)->delayed_tasks.pop();
if (type == FlushType::kSingle) {
break;
}
Expand All @@ -136,8 +127,7 @@ void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
}

size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
if (queue_entry->subsumed_by != _kUnmerged) {
return 0;
Expand All @@ -148,7 +138,6 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {

TaskQueueId subsumed = queue_entry->owner_of;
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
const auto& subsumed_entry = queue_entries_.at(subsumed);
total_tasks += subsumed_entry->delayed_tasks.size();
}
Expand All @@ -158,22 +147,20 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id,
intptr_t key,
const fml::closure& callback) {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
queue_entries_[queue_id]->task_observers[key] = std::move(callback);
queue_entries_.at(queue_id)->task_observers[key] = std::move(callback);
}

void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id,
intptr_t key) {
std::scoped_lock queue_lock(GetMutex(queue_id));

queue_entries_[queue_id]->task_observers.erase(key);
std::lock_guard guard(queue_mutex_);
queue_entries_.at(queue_id)->task_observers.erase(key);
}

std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
std::vector<fml::closure> observers;

if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) {
Expand All @@ -186,7 +173,6 @@ std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(

TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of;
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
observers.push_back(observer.second);
}
Expand All @@ -197,9 +183,8 @@ std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(

void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id,
fml::Wakeable* wakeable) {
std::scoped_lock queue_lock(GetMutex(queue_id));

FML_CHECK(!queue_entries_[queue_id]->wakeable)
std::lock_guard guard(queue_mutex_);
FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
<< "Wakeable can only be set once.";
queue_entries_.at(queue_id)->wakeable = wakeable;
}
Expand All @@ -208,12 +193,7 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
if (owner == subsumed) {
return true;
}

std::mutex& owner_mutex = GetMutex(owner);
std::mutex& subsumed_mutex = GetMutex(subsumed);

std::scoped_lock lock(owner_mutex, subsumed_mutex);

std::lock_guard guard(queue_mutex_);
auto& owner_entry = queue_entries_.at(owner);
auto& subsumed_entry = queue_entries_.at(subsumed);

Expand Down Expand Up @@ -242,15 +222,14 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
}

bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
std::scoped_lock owner_lock(GetMutex(owner));

auto& owner_entry = queue_entries_[owner];
std::lock_guard guard(queue_mutex_);
const auto& owner_entry = queue_entries_.at(owner);
const TaskQueueId subsumed = owner_entry->owner_of;
if (subsumed == _kUnmerged) {
return false;
}

queue_entries_[subsumed]->subsumed_by = _kUnmerged;
queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
owner_entry->owner_of = _kUnmerged;

if (HasPendingTasksUnlocked(owner)) {
Expand All @@ -266,17 +245,10 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {

bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
TaskQueueId subsumed) const {
std::scoped_lock owner_lock(GetMutex(owner));
std::lock_guard guard(queue_mutex_);
return subsumed == queue_entries_.at(owner)->owner_of || owner == subsumed;
}

std::mutex& MessageLoopTaskQueues::GetMutex(TaskQueueId queue_id) const {
fml::SharedLock queue_reader(*queue_meta_mutex_);
FML_DCHECK(queue_locks_.count(queue_id) && queue_entries_.count(queue_id))
<< "Trying to acquire a lock on an invalid queue_id: " << queue_id;
return *queue_locks_.at(queue_id);
}

// Subsumed queues will never have pending tasks.
// Owning queues will consider both their and their subsumed tasks.
bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
Expand Down
7 changes: 1 addition & 6 deletions fml/message_loop_task_queues.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,12 @@ class MessageLoopTaskQueues
private:
class MergedQueuesRunner;

using Mutexes = std::vector<std::unique_ptr<std::mutex>>;

MessageLoopTaskQueues();

~MessageLoopTaskQueues();

void WakeUpUnlocked(TaskQueueId queue_id, fml::TimePoint time) const;

std::mutex& GetMutex(TaskQueueId queue_id) const;

bool HasPendingTasksUnlocked(TaskQueueId queue_id) const;

const DelayedTask& PeekNextTaskUnlocked(TaskQueueId queue_id,
Expand All @@ -147,9 +143,8 @@ class MessageLoopTaskQueues
static std::mutex creation_mutex_;
static fml::RefPtr<MessageLoopTaskQueues> instance_;

std::unique_ptr<fml::SharedMutex> queue_meta_mutex_;
mutable std::mutex queue_mutex_;
std::map<TaskQueueId, std::unique_ptr<TaskQueueEntry>> queue_entries_;
std::map<TaskQueueId, std::unique_ptr<std::mutex>> queue_locks_;

size_t task_queue_id_counter_;

Expand Down

0 comments on commit 5c70356

Please sign in to comment.