Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify task queues locking mechanism #16477

Merged
merged 1 commit into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions fml/gpu_thread_merger_unittests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
#include "flutter/fml/task_runner.h"
#include "gtest/gtest.h"

// TODO(49007): Flaky. Investigate, fix and re-enable.
TEST(GpuThreadMerger, DISABLED_RemainMergedTillLeaseExpires) {
TEST(GpuThreadMerger, RemainMergedTillLeaseExpires) {
fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent latch1;
fml::AutoResetWaitableEvent term1;
Expand Down Expand Up @@ -62,8 +61,7 @@ TEST(GpuThreadMerger, DISABLED_RemainMergedTillLeaseExpires) {
thread2.join();
}

// TODO(49007): Flaky. Investigate, fix and re-enable.
TEST(GpuThreadMerger, DISABLED_IsNotOnRasterizingThread) {
TEST(GpuThreadMerger, IsNotOnRasterizingThread) {
fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent latch1;
std::thread thread1([&loop1, &latch1]() {
Expand Down Expand Up @@ -148,8 +146,7 @@ TEST(GpuThreadMerger, DISABLED_IsNotOnRasterizingThread) {
thread2.join();
}

// TODO(49007): Flaky. Investigate, fix and re-enable.
TEST(GpuThreadMerger, DISABLED_LeaseExtension) {
TEST(GpuThreadMerger, LeaseExtension) {
fml::MessageLoop* loop1 = nullptr;
fml::AutoResetWaitableEvent latch1;
fml::AutoResetWaitableEvent term1;
Expand Down
76 changes: 24 additions & 52 deletions fml/message_loop_task_queues.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "flutter/fml/make_copyable.h"
#include "flutter/fml/message_loop_impl.h"

#include <iostream>

namespace fml {

std::mutex MessageLoopTaskQueues::creation_mutex_;
Expand All @@ -32,55 +34,46 @@ fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {
}

TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
fml::UniqueLock lock(*queue_meta_mutex_);
std::lock_guard guard(queue_mutex_);
TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
++task_queue_id_counter_;

queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>();
queue_locks_[loop_id] = std::make_unique<std::mutex>();

return loop_id;
}

MessageLoopTaskQueues::MessageLoopTaskQueues()
: queue_meta_mutex_(fml::SharedMutex::Create()),
task_queue_id_counter_(0),
order_(0) {}
: task_queue_id_counter_(0), order_(0) {}

MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;

void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
queue_entries_.erase(queue_id);
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
queue_entries_.erase(subsumed);
}
}

void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
queue_entry->delayed_tasks = {};
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
queue_entries_.at(subsumed)->delayed_tasks = {};
}
}

void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
const fml::closure& task,
fml::TimePoint target_time) {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
size_t order = order_++;
const auto& queue_entry = queue_entries_[queue_id];
const auto& queue_entry = queue_entries_.at(queue_id);
queue_entry->delayed_tasks.push({order, task, target_time});
TaskQueueId loop_to_wake = queue_id;
if (queue_entry->subsumed_by != _kUnmerged) {
Expand All @@ -91,17 +84,15 @@ void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
}

bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
return HasPendingTasksUnlocked(queue_id);
}

void MessageLoopTaskQueues::GetTasksToRunNow(
TaskQueueId queue_id,
FlushType type,
std::vector<fml::closure>& invocations) {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
if (!HasPendingTasksUnlocked(queue_id)) {
return;
}
Expand All @@ -115,7 +106,7 @@ void MessageLoopTaskQueues::GetTasksToRunNow(
break;
}
invocations.emplace_back(std::move(top.GetTask()));
queue_entries_[top_queue]->delayed_tasks.pop();
queue_entries_.at(top_queue)->delayed_tasks.pop();
if (type == FlushType::kSingle) {
break;
}
Expand All @@ -136,8 +127,7 @@ void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
}

size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
if (queue_entry->subsumed_by != _kUnmerged) {
return 0;
Expand All @@ -148,7 +138,6 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {

TaskQueueId subsumed = queue_entry->owner_of;
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
const auto& subsumed_entry = queue_entries_.at(subsumed);
total_tasks += subsumed_entry->delayed_tasks.size();
}
Expand All @@ -158,22 +147,20 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id,
intptr_t key,
const fml::closure& callback) {
std::scoped_lock queue_lock(GetMutex(queue_id));

std::lock_guard guard(queue_mutex_);
FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
queue_entries_[queue_id]->task_observers[key] = std::move(callback);
queue_entries_.at(queue_id)->task_observers[key] = std::move(callback);
}

void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id,
intptr_t key) {
std::scoped_lock queue_lock(GetMutex(queue_id));

queue_entries_[queue_id]->task_observers.erase(key);
std::lock_guard guard(queue_mutex_);
queue_entries_.at(queue_id)->task_observers.erase(key);
}

std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
TaskQueueId queue_id) const {
std::scoped_lock queue_lock(GetMutex(queue_id));
std::lock_guard guard(queue_mutex_);
std::vector<fml::closure> observers;

if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) {
Expand All @@ -186,7 +173,6 @@ std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(

TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of;
if (subsumed != _kUnmerged) {
std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed));
for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
observers.push_back(observer.second);
}
Expand All @@ -197,9 +183,8 @@ std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(

void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id,
fml::Wakeable* wakeable) {
std::scoped_lock queue_lock(GetMutex(queue_id));

FML_CHECK(!queue_entries_[queue_id]->wakeable)
std::lock_guard guard(queue_mutex_);
FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
<< "Wakeable can only be set once.";
queue_entries_.at(queue_id)->wakeable = wakeable;
}
Expand All @@ -208,12 +193,7 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
if (owner == subsumed) {
return true;
}

std::mutex& owner_mutex = GetMutex(owner);
std::mutex& subsumed_mutex = GetMutex(subsumed);

std::scoped_lock lock(owner_mutex, subsumed_mutex);

std::lock_guard guard(queue_mutex_);
auto& owner_entry = queue_entries_.at(owner);
auto& subsumed_entry = queue_entries_.at(subsumed);

Expand Down Expand Up @@ -242,15 +222,14 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
}

bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
std::scoped_lock owner_lock(GetMutex(owner));

auto& owner_entry = queue_entries_[owner];
std::lock_guard guard(queue_mutex_);
const auto& owner_entry = queue_entries_.at(owner);
const TaskQueueId subsumed = owner_entry->owner_of;
if (subsumed == _kUnmerged) {
return false;
}

queue_entries_[subsumed]->subsumed_by = _kUnmerged;
queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
owner_entry->owner_of = _kUnmerged;

if (HasPendingTasksUnlocked(owner)) {
Expand All @@ -266,17 +245,10 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {

bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
TaskQueueId subsumed) const {
std::scoped_lock owner_lock(GetMutex(owner));
std::lock_guard guard(queue_mutex_);
return subsumed == queue_entries_.at(owner)->owner_of || owner == subsumed;
}

std::mutex& MessageLoopTaskQueues::GetMutex(TaskQueueId queue_id) const {
fml::SharedLock queue_reader(*queue_meta_mutex_);
FML_DCHECK(queue_locks_.count(queue_id) && queue_entries_.count(queue_id))
<< "Trying to acquire a lock on an invalid queue_id: " << queue_id;
return *queue_locks_.at(queue_id);
}

// Subsumed queues will never have pending tasks.
// Owning queues will consider both their and their subsumed tasks.
bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
Expand Down
7 changes: 1 addition & 6 deletions fml/message_loop_task_queues.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,12 @@ class MessageLoopTaskQueues
private:
class MergedQueuesRunner;

using Mutexes = std::vector<std::unique_ptr<std::mutex>>;

MessageLoopTaskQueues();

~MessageLoopTaskQueues();

void WakeUpUnlocked(TaskQueueId queue_id, fml::TimePoint time) const;

std::mutex& GetMutex(TaskQueueId queue_id) const;

bool HasPendingTasksUnlocked(TaskQueueId queue_id) const;

const DelayedTask& PeekNextTaskUnlocked(TaskQueueId queue_id,
Expand All @@ -147,9 +143,8 @@ class MessageLoopTaskQueues
static std::mutex creation_mutex_;
static fml::RefPtr<MessageLoopTaskQueues> instance_;

std::unique_ptr<fml::SharedMutex> queue_meta_mutex_;
mutable std::mutex queue_mutex_;
std::map<TaskQueueId, std::unique_ptr<TaskQueueEntry>> queue_entries_;
std::map<TaskQueueId, std::unique_ptr<std::mutex>> queue_locks_;

size_t task_queue_id_counter_;

Expand Down