Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResourceLoader: Add thread-aware resource changed mechanism #96593

Merged
merged 2 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions core/io/resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
#include <stdio.h>

void Resource::emit_changed() {
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
// Let the connection happen on the call queue, later, since signals are not thread-safe.
call_deferred("emit_signal", CoreStringName(changed));
} else {
emit_signal(CoreStringName(changed));
if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
ResourceLoader::resource_changed_emit(this);
return;
}

emit_signal(CoreStringName(changed));
}

void Resource::_resource_path_changed() {
Expand Down Expand Up @@ -166,22 +166,22 @@ bool Resource::editor_can_reload_from_file() {
}

void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) {
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
// Let the check and connection happen on the call queue, later, since signals are not thread-safe.
callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags);
if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
ResourceLoader::resource_changed_connect(this, p_callable, p_flags);
return;
}

if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) {
connect(CoreStringName(changed), p_callable, p_flags);
}
}

void Resource::disconnect_changed(const Callable &p_callable) {
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
// Let the check and disconnection happen on the call queue, later, since signals are not thread-safe.
callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable);
if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
ResourceLoader::resource_changed_disconnect(this, p_callable);
return;
}

if (is_connected(CoreStringName(changed), p_callable)) {
disconnect(CoreStringName(changed), p_callable);
}
Expand Down
150 changes: 113 additions & 37 deletions core/io/resource_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "resource_loader.h"

#include "core/config/project_settings.h"
#include "core/core_bind.h"
#include "core/io/file_access.h"
#include "core/io/resource_importer.h"
#include "core/object/script_language.h"
Expand Down Expand Up @@ -234,17 +235,22 @@ void ResourceLoader::LoadToken::clear() {
// User-facing tokens shouldn't be deleted until completely claimed.
DEV_ASSERT(user_rc == 0 && user_path.is_empty());

if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered.
DEV_ASSERT(thread_load_tasks.has(local_path));
ThreadLoadTask &load_task = thread_load_tasks[local_path];
if (load_task.task_id && !load_task.awaited) {
task_to_await = load_task.task_id;
if (!local_path.is_empty()) {
if (task_if_unregistered) {
memdelete(task_if_unregistered);
task_if_unregistered = nullptr;
} else {
DEV_ASSERT(thread_load_tasks.has(local_path));
ThreadLoadTask &load_task = thread_load_tasks[local_path];
if (load_task.task_id && !load_task.awaited) {
task_to_await = load_task.task_id;
}
// Removing a task which is still in progress would be catastrophic.
// Tokens must be alive until the task thread function is done.
DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
thread_load_tasks.erase(local_path);
}
// Removing a task which is still in progress would be catastrophic.
// Tokens must be alive until the task thread function is done.
DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
thread_load_tasks.erase(local_path);
local_path.clear();
local_path.clear(); // Mark as already cleared.
}
}

Expand Down Expand Up @@ -324,6 +330,9 @@ void ResourceLoader::_run_load_task(void *p_userdata) {
}
}

ThreadLoadTask *curr_load_task_backup = curr_load_task;
curr_load_task = &load_task;

// Thread-safe either if it's the current thread or a brand new one.
CallQueue *own_mq_override = nullptr;
if (load_nesting == 0) {
Expand Down Expand Up @@ -451,6 +460,8 @@ void ResourceLoader::_run_load_task(void *p_userdata) {
}
DEV_ASSERT(load_paths_stack.is_empty());
}

curr_load_task = curr_load_task_backup;
}

static String _validate_local_path(const String &p_path) {
Expand Down Expand Up @@ -521,9 +532,7 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,

Ref<LoadToken> load_token;
bool must_not_register = false;
ThreadLoadTask unregistered_load_task; // Once set, must be valid up to the call to do the load.
ThreadLoadTask *load_task_ptr = nullptr;
bool run_on_current_thread = false;
{
MutexLock thread_load_lock(thread_load_mutex);

Expand Down Expand Up @@ -578,22 +587,19 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
}
}

// If we want to ignore cache, but there's another task loading it, we can't add this one to the map and we also have to finish within scope.
// If we want to ignore cache, but there's another task loading it, we can't add this one to the map.
must_not_register = ignoring_cache && thread_load_tasks.has(local_path);
if (must_not_register) {
load_token->local_path.clear();
unregistered_load_task = load_task;
load_task_ptr = &unregistered_load_task;
load_token->task_if_unregistered = memnew(ThreadLoadTask(load_task));
load_task_ptr = load_token->task_if_unregistered;
} else {
DEV_ASSERT(!thread_load_tasks.has(local_path));
HashMap<String, ResourceLoader::ThreadLoadTask>::Iterator E = thread_load_tasks.insert(local_path, load_task);
load_task_ptr = &E->value;
}
}

run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT;

if (run_on_current_thread) {
if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) {
// The current thread may happen to be a thread from the pool.
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id();
if (tid != WorkerThreadPool::INVALID_TASK_ID) {
Expand All @@ -606,11 +612,8 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
}
} // MutexLock(thread_load_mutex).

if (run_on_current_thread) {
if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) {
_run_load_task(load_task_ptr);
if (must_not_register) {
load_token->res_if_unregistered = load_task_ptr->resource;
}
}

return load_token;
Expand Down Expand Up @@ -738,7 +741,10 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
*r_error = OK;
}

if (!p_load_token.local_path.is_empty()) {
ThreadLoadTask *load_task_ptr = nullptr;
if (p_load_token.task_if_unregistered) {
load_task_ptr = p_load_token.task_if_unregistered;
} else {
if (!thread_load_tasks.has(p_load_token.local_path)) {
if (r_error) {
*r_error = ERR_BUG;
Expand Down Expand Up @@ -809,22 +815,47 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
load_task.error = FAILED;
}

Ref<Resource> resource = load_task.resource;
if (r_error) {
*r_error = load_task.error;
}
return resource;
} else {
// Special case of an unregistered task.
// The resource should have been loaded by now.
Ref<Resource> resource = p_load_token.res_if_unregistered;
if (!resource.is_valid()) {
if (r_error) {
*r_error = FAILED;
load_task_ptr = &load_task;
}

Ref<Resource> resource = load_task_ptr->resource;
if (r_error) {
*r_error = load_task_ptr->error;
}

if (resource.is_valid()) {
if (curr_load_task) {
// A task awaiting another => Let the awaiter accumulate the resource changed connections.
DEV_ASSERT(curr_load_task != load_task_ptr);
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
curr_load_task->resource_changed_connections.push_back(rcc);
}
} else {
// A leaf task being awaited => Propagate the resource changed connections.
if (Thread::is_main_thread()) {
// On the main thread it's safe to migrate the connections to the standard signal mechanism.
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
if (rcc.callable.is_valid()) {
rcc.source->connect_changed(rcc.callable, rcc.flags);
}
}
} else {
// On non-main threads, we have to queue and call it done when processed.
if (!load_task_ptr->resource_changed_connections.is_empty()) {
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
if (rcc.callable.is_valid()) {
MessageQueue::get_main_singleton()->push_callable(callable_mp(rcc.source, &Resource::connect_changed).bind(rcc.callable, rcc.flags));
}
}
core_bind::Semaphore done;
MessageQueue::get_main_singleton()->push_callable(callable_mp(&done, &core_bind::Semaphore::post));
done.wait();
RandomShaper marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return resource;
}

return resource;
}

bool ResourceLoader::_ensure_load_progress() {
Expand All @@ -838,6 +869,50 @@ bool ResourceLoader::_ensure_load_progress() {
return true;
}

void ResourceLoader::resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags) {
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "\t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id()));

MutexLock lock(thread_load_mutex);

for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) {
if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) {
return;
}
}

ThreadLoadTask::ResourceChangedConnection rcc;
rcc.source = p_source;
rcc.callable = p_callable;
rcc.flags = p_flags;
curr_load_task->resource_changed_connections.push_back(rcc);
}

void ResourceLoader::resource_changed_disconnect(Resource *p_source, const Callable &p_callable) {
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id()));

MutexLock lock(thread_load_mutex);

for (uint32_t i = 0; i < curr_load_task->resource_changed_connections.size(); ++i) {
const ThreadLoadTask::ResourceChangedConnection &rcc = curr_load_task->resource_changed_connections[i];
if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) {
curr_load_task->resource_changed_connections.remove_at_unordered(i);
return;
}
}
}

void ResourceLoader::resource_changed_emit(Resource *p_source) {
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR, Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class()));

MutexLock lock(thread_load_mutex);

for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) {
if (unlikely(rcc.source == p_source)) {
rcc.callable.call();
}
}
}

Ref<Resource> ResourceLoader::ensure_resource_ref_override_for_outer_load(const String &p_path, const String &p_res_type) {
ERR_FAIL_COND_V(load_nesting == 0, Ref<Resource>()); // It makes no sense to use this from nesting level 0.
const String &local_path = _validate_local_path(p_path);
Expand Down Expand Up @@ -1368,6 +1443,7 @@ bool ResourceLoader::timestamp_on_load = false;
thread_local int ResourceLoader::load_nesting = 0;
thread_local Vector<String> ResourceLoader::load_paths_stack;
thread_local HashMap<int, HashMap<String, Ref<Resource>>> ResourceLoader::res_ref_overrides;
thread_local ResourceLoader::ThreadLoadTask *ResourceLoader::curr_load_task = nullptr;

SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> &_get_res_loader_mutex() {
return ResourceLoader::thread_load_mutex;
Expand Down
16 changes: 15 additions & 1 deletion core/io/resource_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class ResourceLoader {
MAX_LOADERS = 64
};

struct ThreadLoadTask;

public:
enum ThreadLoadStatus {
THREAD_LOAD_INVALID_RESOURCE,
Expand All @@ -124,7 +126,7 @@ class ResourceLoader {
String local_path;
String user_path;
uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero.
Ref<Resource> res_if_unregistered;
ThreadLoadTask *task_if_unregistered = nullptr;

void clear();

Expand Down Expand Up @@ -187,13 +189,21 @@ class ResourceLoader {
Ref<Resource> resource;
bool use_sub_threads = false;
HashSet<String> sub_tasks;

struct ResourceChangedConnection {
Resource *source = nullptr;
Callable callable;
uint32_t flags = 0;
};
LocalVector<ResourceChangedConnection> resource_changed_connections;
};

static void _run_load_task(void *p_userdata);

static thread_local int load_nesting;
static thread_local HashMap<int, HashMap<String, Ref<Resource>>> res_ref_overrides; // Outermost key is nesting level.
static thread_local Vector<String> load_paths_stack;
static thread_local ThreadLoadTask *curr_load_task;

static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex;
friend SafeBinaryMutex<BINARY_MUTEX_TAG> &_get_res_loader_mutex();
Expand All @@ -214,6 +224,10 @@ class ResourceLoader {

static bool is_within_load() { return load_nesting > 0; };

static void resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags);
static void resource_changed_disconnect(Resource *p_source, const Callable &p_callable);
static void resource_changed_emit(Resource *p_source);

static Ref<Resource> load(const String &p_path, const String &p_type_hint = "", ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE, Error *r_error = nullptr);
static bool exists(const String &p_path, const String &p_type_hint = "");

Expand Down
Loading