Skip to content

Commit

Permalink
Share side modules with worker threads via postMessage
Browse files Browse the repository at this point in the history
Any side modules that are loaded at the time of worker creation are
shared with the worker via the initial postMessage.

As a followup we should extend this to modules that are loaded after
the worker is created but before the pthread runs (for example when
a module is loaded while a worker is unused).

Fixes: #18552
  • Loading branch information
sbc100 committed May 25, 2023
1 parent 3204382 commit f5f159d
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 68 deletions.
3 changes: 3 additions & 0 deletions emcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,10 @@ def phase_linker_setup(options, state, newargs):
assert not settings.SIDE_MODULE
if settings.MAIN_MODULE == 1:
settings.INCLUDE_FULL_LIBRARY = 1
# Called from preamble.js once the main module is instantiated.
settings.DEFAULT_LIBRARY_FUNCS_TO_INCLUDE += ['$loadDylibs']
if settings.STACK_OVERFLOW_CHECK == 2:
settings.DEFAULT_LIBRARY_FUNCS_TO_INCLUDE += ['$setDylinkStackLimits']
settings.REQUIRED_EXPORTS += ['malloc']

if settings.MAIN_MODULE == 1 or settings.SIDE_MODULE == 1:
Expand Down
51 changes: 37 additions & 14 deletions src/library_dylink.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var LibraryDylink = {
// than just running the promises in parallel, this makes a chain of
// promises to run in series.
wasmPlugin['promiseChainEnd'] = wasmPlugin['promiseChainEnd'].then(
() => loadWebAssemblyModule(byteArray, {loadAsync: true, nodelete: true})).then(
() => loadWebAssemblyModule(byteArray, {loadAsync: true, nodelete: true}, name)).then(
(exports) => {
#if DYLINK_DEBUG
dbg(`registering preloadedWasm: ${name}`);
Expand Down Expand Up @@ -618,6 +618,7 @@ var LibraryDylink = {
// promise that resolves to its exports if the loadAsync flag is set.
$loadWebAssemblyModule__docs: `
/**
* @param {string=} libName
* @param {Object=} localScope
* @param {number=} handle
*/`,
Expand All @@ -629,7 +630,10 @@ var LibraryDylink = {
'$currentModuleWeakSymbols', '$alignMemory', '$zeroMemory',
'$updateTableMap',
],
$loadWebAssemblyModule: function(binary, flags, localScope, handle) {
$loadWebAssemblyModule: function(binary, flags, libName, localScope, handle) {
#if DYLINK_DEBUG
dbg(`loadWebAssemblyModule: ${libName}`);
#endif
var metadata = getDylinkMetadata(binary);
currentModuleWeakSymbols = metadata.weakImports;
#if ASSERTIONS
Expand Down Expand Up @@ -752,10 +756,20 @@ var LibraryDylink = {
'{{{ WASI_MODULE_NAME }}}': proxy,
};

function postInstantiation(instance) {
function postInstantiation(module, instance) {
#if ASSERTIONS
// the table should be unchanged
assert(wasmTable === originalTable);
#endif
#if PTHREADS
if (!ENVIRONMENT_IS_PTHREAD && libName) {
#if DYLINK_DEBUG
dbg(`registering sharedModules: ${libName}`)
#endif
// cache all loaded modules in `sharedModules`, which gets passed
// to new workers when they are created.
sharedModules[libName] = module;
}
#endif
// add new entries to functionsInTableMap
updateTableMap(tableBase, metadata.tableSize);
Expand All @@ -768,12 +782,11 @@ var LibraryDylink = {
}
#if STACK_OVERFLOW_CHECK >= 2
if (moduleExports['__set_stack_limits']) {
#if PTHREADS
// When we are on an uninitialized pthread we delay calling
// __set_stack_limits until $setDylinkStackLimits.
if (!ENVIRONMENT_IS_PTHREAD || runtimeInitialized)
#endif
moduleExports['__set_stack_limits']({{{ to64('_emscripten_stack_get_base()') }}}, {{{ to64('_emscripten_stack_get_end()') }}});
if (runtimeInitialized) {
moduleExports['__set_stack_limits']({{{ to64('_emscripten_stack_get_base()') }}}, {{{ to64('_emscripten_stack_get_end()') }}});
}
}
#endif

Expand Down Expand Up @@ -846,16 +859,16 @@ var LibraryDylink = {
if (flags.loadAsync) {
if (binary instanceof WebAssembly.Module) {
var instance = new WebAssembly.Instance(binary, info);
return Promise.resolve(postInstantiation(instance));
return Promise.resolve(postInstantiation(binary, instance));
}
return WebAssembly.instantiate(binary, info).then(
(result) => postInstantiation(result.instance)
(result) => postInstantiation(result.module, result.instance)
);
}

var module = binary instanceof WebAssembly.Module ? binary : new WebAssembly.Module(binary);
var instance = new WebAssembly.Instance(module, info);
return postInstantiation(instance);
return postInstantiation(module, instance);
}

// now load needed libraries and the module itself.
Expand All @@ -871,7 +884,7 @@ var LibraryDylink = {
return loadModule();
},

#if STACK_OVERFLOW_CHECK >= 2 && PTHREADS
#if STACK_OVERFLOW_CHECK >= 2
// With PTHREADS we load libraries before we are running a pthread and
// therefore before we have a stack. Instead we delay calling
// `__set_stack_limits` until we start running a thread. We also need to call
Expand All @@ -884,7 +897,7 @@ var LibraryDylink = {
#endif
var lib = LDSO.loadedLibsByName[name];
if (lib.exports['__set_stack_limits']) {
lib.exports['__set_stack_limits'](stackTop, stackMax);
lib.exports['__set_stack_limits']({{{ to64("stackTop") }}}, {{{ to64("stackMax") }}});
}
}
},
Expand Down Expand Up @@ -972,6 +985,16 @@ var LibraryDylink = {

// libName -> libData
function loadLibData() {
#if PTHREADS
var sharedMod = sharedModules[libName];
#if DYLINK_DEBUG
dbg(`checking sharedModules: ${libName}: ${sharedMod ? 'found' : 'not found'}`);
#endif
if (sharedMod) {
return flags.loadAsync ? Promise.resolve(sharedMod) : sharedMod;
}
#endif

// for wasm, we can use fetch for async, but for fs mode we can only imitate it
if (handle) {
var data = {{{ makeGetValue('handle', C_STRUCTS.dso.file_data, '*') }}};
Expand Down Expand Up @@ -1011,10 +1034,10 @@ var LibraryDylink = {

// module not preloaded - load lib data and create new module from it
if (flags.loadAsync) {
return loadLibData().then((libData) => loadWebAssemblyModule(libData, flags, localScope, handle));
return loadLibData().then((libData) => loadWebAssemblyModule(libData, flags, libName, localScope, handle));
}

return loadWebAssemblyModule(loadLibData(), flags, localScope, handle);
return loadWebAssemblyModule(loadLibData(), flags, libName, localScope, handle);
}

// module for lib is loaded - update the dso & global namespace
Expand Down
13 changes: 12 additions & 1 deletion src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ var LibraryPThread = {
PThread.allocateUnusedWorker();
}
#endif
#if !MINIMAL_RUNTIME
// MINIMAL_RUNTIME takes care of calling loadWasmModuleToAllWorkers
// in postamble_minimal.js
addOnPreRun(() => {
addRunDependency('loading-workers')
PThread.loadWasmModuleToAllWorkers(() => removeRunDependency('loading-workers'));
});
#endif
#if MAIN_MODULE
PThread.outstandingPromises = {};
// Finished threads are threads that have finished running but we not yet
Expand Down Expand Up @@ -402,7 +410,9 @@ var LibraryPThread = {
'wasmOffsetConverter': wasmOffsetConverter,
#endif
#if MAIN_MODULE
'dynamicLibraries': Module['dynamicLibraries'],
// Shared all modules that have been loaded so far. New workers
// won't start running threads until these are all loaded.
'sharedModules': sharedModules,
#endif
#if ASSERTIONS
'workerID': worker.workerID,
Expand All @@ -423,6 +433,7 @@ var LibraryPThread = {
) {
return onMaybeReady();
}

let pthreadPoolReady = Promise.all(PThread.unusedWorkers.map(PThread.loadWasmModuleToWorker));
#if PTHREAD_POOL_DELAY_LOAD
// PTHREAD_POOL_DELAY_LOAD means we want to proceed synchronously without
Expand Down
30 changes: 4 additions & 26 deletions src/postamble.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,10 @@ function stackCheckInit() {
}
#endif

#if RELOCATABLE
var dylibsLoaded = false;
#if '$LDSO' in addedLibraryItems
LDSO.init();
#endif
#if MAIN_MODULE && PTHREADS
// Map of modules to be shared with new threads. This gets populated by the
// main thread and shared with all new workers.
var sharedModules = Module['sharedModules'] || [];
#endif

#if MAIN_READS_PARAMS
Expand All @@ -158,27 +157,6 @@ function run() {
stackCheckInit();
#endif

#if RELOCATABLE
if (!dylibsLoaded) {
// Loading of dynamic libraries needs to happen on each thread, so we can't
// use the normal __ATPRERUN__ mechanism.
#if MAIN_MODULE
loadDylibs();
#else
reportUndefinedSymbols();
#endif
dylibsLoaded = true;

// Loading dylibs can add run dependencies.
if (runDependencies > 0) {
#if RUNTIME_DEBUG
dbg('loadDylibs added run() dependencies, not running yet');
#endif
return;
}
}
#endif

#if WASM_WORKERS
if (ENVIRONMENT_IS_WASM_WORKER) {
#if MODULARIZE
Expand Down
18 changes: 11 additions & 7 deletions src/preamble.js
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,14 @@ function initRuntime() {

#if STACK_OVERFLOW_CHECK >= 2
#if RUNTIME_DEBUG
dbg('__set_stack_limits: ' + _emscripten_stack_get_base() + ', ' + _emscripten_stack_get_end());
dbg(`__set_stack_limits: ${ptrToString(_emscripten_stack_get_base())}, ${ptrToString(_emscripten_stack_get_end())}`);
#endif
#if MAIN_MODULE
setDylinkStackLimits(_emscripten_stack_get_base(), _emscripten_stack_get_end());
#else
___set_stack_limits(_emscripten_stack_get_base(), _emscripten_stack_get_end());
#endif
#endif
#if RELOCATABLE
callRuntimeCallbacks(__RELOC_FUNCS__);
#endif
Expand Down Expand Up @@ -980,6 +984,12 @@ function createWasm() {
}
#endif
mergeLibSymbols(exports, 'main')
#if '$LDSO' in addedLibraryItems
LDSO.init();
#endif
loadDylibs();
#elif RELOCATABLE
reportUndefinedSymbols();
#endif

#if MEMORY64
Expand Down Expand Up @@ -1047,13 +1057,7 @@ function createWasm() {
// We now have the Wasm module loaded up, keep a reference to the compiled module so we can post it to the workers.
wasmModule = module;
#endif

#if PTHREADS
PThread.loadWasmModuleToAllWorkers(() => removeRunDependency('wasm-instantiate'));
#else // singlethreaded build:
removeRunDependency('wasm-instantiate');
#endif // ~PTHREADS

return exports;
}
// wait for the pthread pool (if any)
Expand Down
5 changes: 4 additions & 1 deletion src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ function handleMessage(e) {
#endif // MINIMAL_RUNTIME

#if MAIN_MODULE
Module['dynamicLibraries'] = e.data.dynamicLibraries;
Module['sharedModules'] = e.data.sharedModules;
#if RUNTIME_DEBUG
dbg(`received ${Object.keys(e.data.sharedModules).length} shared modules: ${Object.keys(e.data.sharedModules)}`);
#endif
#endif

// Use `const` here to ensure that the variable is scoped only to
Expand Down
2 changes: 1 addition & 1 deletion test/other/metadce/test_metadce_hello_dylink.jssize
Original file line number Diff line number Diff line change
@@ -1 +1 @@
15060
15032
2 changes: 1 addition & 1 deletion test/other/metadce/test_metadce_minimal_pthreads.jssize
Original file line number Diff line number Diff line change
@@ -1 +1 @@
15415
15456
2 changes: 1 addition & 1 deletion test/other/test_unoptimized_code_size.js.size
Original file line number Diff line number Diff line change
@@ -1 +1 @@
59630
59629
2 changes: 1 addition & 1 deletion test/other/test_unoptimized_code_size_no_asserts.js.size
Original file line number Diff line number Diff line change
@@ -1 +1 @@
33256
33255
2 changes: 1 addition & 1 deletion test/other/test_unoptimized_code_size_strict.js.size
Original file line number Diff line number Diff line change
@@ -1 +1 @@
58572
58571
32 changes: 19 additions & 13 deletions test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9474,34 +9474,40 @@ def test_pthread_dylink_main_module_1(self):
self.do_runf(test_file('hello_world.c'))

@needs_dylink
@node_pthreads
def test_Module_dynamicLibraries_pthreads(self):
@parameterized({
'': ([],),
'pthreads': (['-sPROXY_TO_PTHREAD', '-sEXIT_RUNTIME', '-pthread', '-Wno-experimental'],)
})
def test_Module_dynamicLibraries(self, args):
# test that Module.dynamicLibraries works with pthreads
self.emcc_args += ['-pthread', '-Wno-experimental']
self.emcc_args += args
self.emcc_args += ['--pre-js', 'pre.js']
self.set_setting('PROXY_TO_PTHREAD')
self.set_setting('EXIT_RUNTIME')
# This test is for setting dynamicLibraries at runtime so we don't
# want emscripten loading `liblib.so` automatically (which it would
# do without this setting.
self.set_setting('NO_AUTOLOAD_DYLIBS')

create_file('pre.js', '''
if (typeof importScripts == 'undefined') { // !ENVIRONMENT_IS_WORKER
// Load liblib.so by default on non-workers
Module['dynamicLibraries'] = ['liblib.so'];
} else {
// Verify whether the main thread passes Module.dynamicLibraries to the worker
assert(Module['dynamicLibraries'].includes('liblib.so'));
}
Module['dynamicLibraries'] = ['liblib.so'];
''')

if args:
self.setup_node_pthreads()
create_file('post.js', '''
if (ENVIRONMENT_IS_PTHREAD) {
err('sharedModules: ' + Object.keys(sharedModules));
assert('liblib.so' in sharedModules);
assert(sharedModules['liblib.so'] instanceof WebAssembly.Module);
}
''')
self.emcc_args += ['--post-js', 'post.js']

self.dylink_test(
r'''
#include <stdio.h>
int side();
int main() {
printf("result is %d", side());
printf("result is %d\n", side());
return 0;
}
''',
Expand Down
10 changes: 9 additions & 1 deletion test/test_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -13473,12 +13473,20 @@ def test_preload_module(self, args):
struct stat statbuf;
assert(stat("/library.so", &statbuf) == 0);

// Check that it was preloaded
// Check that it was preloaded.
// The preloading actually only happens on the main thread where the filesystem
// lives. On worker threads the module object is shared via preloadedModules.
if (emscripten_is_main_runtime_thread()) {
int found = EM_ASM_INT(
return preloadedWasm['/library.so'] !== undefined;
);
assert(found);
} else {
int found = EM_ASM_INT(
err(sharedModules);
return sharedModules['/library.so'] !== undefined;
);
assert(found);
}
void *lib_handle = dlopen("/library.so", RTLD_NOW);
assert(lib_handle);
Expand Down

0 comments on commit f5f159d

Please sign in to comment.