From dccc1bbd56d2d5b55c44d99f8743b2b8a57458d3 Mon Sep 17 00:00:00 2001 From: Peter Hoddie Date: Thu, 16 Mar 2023 10:11:57 -0700 Subject: [PATCH] workers - throw on more post failures, configure queue length & timeout, decouple debugger messages #1046 --- documentation/base/worker.md | 51 +++++++++++---- modules/base/worker/modWorker.c | 43 +++++++++++-- tests/modules/base/worker/messages.js | 3 + xs/platforms/esp/xsHost.c | 92 +++++++++++++++++++-------- xs/platforms/esp/xsPlatform.h | 2 + 5 files changed, 144 insertions(+), 47 deletions(-) diff --git a/documentation/base/worker.md b/documentation/base/worker.md index c7d99850b..9d6f76057 100644 --- a/documentation/base/worker.md +++ b/documentation/base/worker.md @@ -1,8 +1,8 @@ # Worker -Copyright 2018-2020 Moddable Tech, Inc.
-Revised: December 30, 2020 +Copyright 2018-2023 Moddable Tech, Inc.
+Revised: March 16, 2023 -The Moddable runtime integrates with XS to allow a multiple virtual machines to co-exist on a single microcontroller. The majority of projects use only a single virtual machine. However, there are situations where the several independent runtime contexts provided by having several virtual machines is advantageous. This isolation is useful to fully separate a particular set of scripts, for example user installed modules, from the core project functionality for security, privacy, and reliability reasons. Another useful situation is to allow scripts to perform blocking operations in one virtual machine while scripts in another virtual machine remain fully responsive. +The Moddable runtime integrates with XS to allow a multiple virtual machines to co-exist on a single microcontroller. The majority of projects use only a single virtual machine. However, there are situations where the several independent runtime contexts provided by having several virtual machines is advantageous. This isolation is useful to fully separate a particular set of scripts, for example user installed modules, from the core project functionality for security, privacy, and reliability reasons. Another useful situation is to allow scripts to perform blocking operations in one virtual machine while scripts in another virtual machine remain fully responsive. On microcontrollers with multiple CPU cores, workers can execute in parallel to take full advantage of the available CPU power. Undertake the use of multiple virtual machines in a project with care. Each virtual machine requires additional RAM, and RAM is the most limited resource on most microcontroller deployments. In addition, the asynchronous nature of communication between virtual machines adds complexity to the overall system. Still, having multiple virtual machines is useful, even essential, in some circumstances. The remainder of this document describes how to use multiple virtual machines with the Moddable SDK together with some implementation details. @@ -12,10 +12,11 @@ The `Worker` class is an API for working with virtual machines. The implementati - The implementation is a small subset of the Web Workers API. - Workers are always launched from a module, never from a script file. - One addition has been made to specify the memory configuration of a new worker. +- Posting a message to a worker throws in additional situations. -Those familiar with Web Workers are strongly advised to read this document to understand whether the implementation differences are relevant to their use of workers. +Those familiar with Web Workers are strongly advised to read this document to understand the implementation differences that may be relevant to their use of workers. -This document contains a standalone description of the `Worker` class implemented in the Moddable SDK, without reference to the Web Worker specification. The [`worker` example](../../examples/base/worker/) is a simple example of using the `Worker` class. +This document contains a standalone description of the `Worker` class implemented in the Moddable SDK. The [`worker` example](../../examples/base/worker/) is a simple example of using the `Worker` class. ## class Worker Scripts import the `Worker` class to be able to create a new worker. @@ -27,20 +28,24 @@ To launch a worker, create an instance of the `Worker` class, passing the name o let aWorker = new Worker("simpleworker"); -The call to the `Worker` constructor returns only after execution of the specified module completes. If the worker module generates an exception during this step, an exception is propagated so that the call to `new Worker` throws an exception. This behavior means that the invoking virtual machine blocks until the new worker virtual machine has fully completely initialization. Consequently, an operations performed in a newly instantiated virtual machine should be relatively brief. +The call to the `Worker` constructor returns only after execution of the specified module completes. If the worker module generates an exception during this step, an exception is propagated so that the call to `new Worker` throws an exception. This behavior means that the invoking virtual machine blocks until the new worker virtual machine has fully completely initialization. Consequently, any operations performed in a newly instantiated virtual machine should be relatively brief. ### Launching a worker with memory configuration -The previous example launches the worker with the default memory configuration. This may not be large enough for the worker, or may allocate more RAM than needed by the worker. An optional configuration dictionary allows the script instantiating a new virtual machine to set the memory use. +The previous example launches the worker with the default memory configuration. This may not be large enough for the worker, or may allocate more RAM than needed by the worker. An optional configuration object allows the script instantiating a new virtual machine to set the memory use. let aWorker = new Worker("simpleworker", {allocation: 8192, stackCount: 64, slotCount: 64}); ### Sending a message to a worker -Messages to workers are JavaScript objects and binary data. The JavaScript objects can be considered equivalent to JSON. The binary data is an `ArrayBuffer`. All messages are passed by copy, so the size of the message should be as small as practical. +Messages to workers are JavaScript objects and binary data. The JavaScript objects can be considered equivalent to JSON. The binary data is an `ArrayBuffer`. aWorker.postMessage({hello: "world", count: 12}); aWorker.postMessage(new ArrayBuffer(12)); +Internally, workers use the XS marshalling feature which supports sending some types of data that fail in browser implementations of workers. If an object cannot be serialized, `postMessage` throws an exception. + +Messages are passed by copy, so the size of the message should be as small as practical. If the memory allocation fails, `postMessage` throws an exception. + ### Receiving a message from a worker The worker instance has an `onmessage` function which receives all messages from the worker. It is typically assigned immediately after the worker is constructed: @@ -48,7 +53,7 @@ The worker instance has an `onmessage` function which receives all messages from trace(message, "\n"); } -An alternative approach is to create a subclass of `Worker` which contains the `onmessage` function. This uses less memory and runs somewhat faster. +An alternative approach is to create a subclass of `Worker` which contains the `onmessage` function. This uses less memory and runs somewhat faster. class MyWorker extends Worker { onmessage(message) { @@ -63,7 +68,7 @@ The script that instantiates a worker may terminate the worker. aWorker.terminate(); -Once a worker is terminated, no further calls should be made to the worker instance. +Once a worker is terminated, no further calls should be made to the worker instance. Attempting to post a message to a terminated work throws an exception. ### Worker script start-up When the Worker constructor is called, the module at the path specified (`simpleworker` in the preceding examples) is loaded and run. The worker itself typically performs two tasks. The first is initialization and the second is installing a function to receive messages. The receiving function is installed on the global object `self` and is named `onmessage`. @@ -85,7 +90,8 @@ A worker script terminates itself by calling `close` on the global object `self` self.close() -### constructor(modulePath[, dictionary]) +### API Reference +#### constructor(modulePath[, dictionary]) The `Worker` constructor takes a path to the module used to initialize the new worker instance. let aWorker = new Worker("simpleworker"); @@ -99,14 +105,14 @@ The `allocation` property is the total memory shared by the new virtual machine If an error occurs or an exception is thrown during execution of the module, the `Worker` constructor also throws an error. -### terminate() +#### terminate() The `terminate` function immediately ends execution of the worker instance, freeing all resources owned by the worker. aWorker.terminate(); Once a worker has been terminated, no further calls should be made to it. -### postMessage(msg) +#### postMessage(msg) The `postMessage` function queues a message for delivery to the worker. Messages are either a JavaScript object, roughly equivalent to JSON objects, or an `ArrayBuffer`. aWorker.postMessage("hello"); @@ -115,7 +121,7 @@ The `postMessage` function queues a message for delivery to the worker. Messages Messages are passed by copy, so they should be in small in size as practical. Messages are delivered in the same order they were sent. -### onmessage property +#### onmessage property The worker `onmessage` property contains a function which receives messages from the worker. aWorker.onmessage = function(msg) { @@ -150,3 +156,20 @@ The debugger for the XS virtual machine, `xsbug`, supports working with multiple ## Shared Memory and Atomics The ECMAScript 2016 standard includes support for Shared Memory and Atomics. These are powerful tools for efficient communication between virtual machines. The XS virtual machine fully implements these features. They are supported on some microcontrollers (ESP32) but not all (ESP8266). + +## Configuration Options +The Web Workers implementation uses `modMessagePostToMachine()`, the native IPC facility of the Moddable SDK, to pass messages between threads. On ESP32 this is implemented using FreeRTOS queues. + +By default the message queue has 10 elements. A message is posted while the queue is full blocks until space become available in the queue. This behavior generally works well as the number of messages being posted is relatively infrequent. If many messages are being sent between the sender and receiver, a deadlock is possible. Two build options are available in the manifest to help if necessary. + +```json +"defines": { + "task": { + "queueLength": 20, + "queueWait": 100 + } +} +``` +The `queueLength` property changes the size of the message queues to the value specified. The `queueWait` property allows posting messages to fail after the specified timeout (given in milliseconds). If a message cannot be enqueued after this timeout period, `postMessage` throws an exception. + +By default, a debug build sets `queueWait` to 1000 milliseconds. In a well-balanced system, messages should enqueue instantaneously and certainly shouldn't block for more than a few millisecond. This default allows debugging of potential queue related issues by throwing instead of deadlocking when message sends take unexpectedly long. By default, release and instrumented builds have an infinite wait for `queueWait` and so never time out. diff --git a/modules/base/worker/modWorker.c b/modules/base/worker/modWorker.c index 8dd5ce883..295ad89e3 100644 --- a/modules/base/worker/modWorker.c +++ b/modules/base/worker/modWorker.c @@ -78,6 +78,8 @@ static void workerDeliverConnect(xsMachine *the, modWorker worker, uint8_t *mess static int workerStart(modWorker worker); static void workerTerminate(xsMachine *the, modWorker worker, uint8_t *message, uint16_t messageLength); +static void doModMessagePostToMachine(xsMachine *the, xsMachine *targetThe, uint8_t *message, modMessageDeliver callback, void *refcon); + static modWorker gWorkers; static void xs_emptyworker_destructor(void *data) { @@ -182,7 +184,7 @@ static void workerConstructor(xsMachine *the, xsBooleanValue shared) #ifdef INC_FREERTOS_H worker->task = target->task; #endif - modMessagePostToMachine(worker->the, NULL, 0, (modMessageDeliver)workerDeliverConnect, worker); + doModMessagePostToMachine(the, worker->the, NULL, (modMessageDeliver)workerDeliverConnect, worker); goto done; } } @@ -287,8 +289,7 @@ void xs_worker_postfrominstantiator(xsMachine *the) xsUnknownError("worker closing"); message = xsMarshall(xsArg(0)); - if (modMessagePostToMachine(worker->the, (uint8_t *)&message, sizeof(message), (modMessageDeliver)workerDeliverMarshall, worker)) - xsUnknownError("post from instantiator failed"); + doModMessagePostToMachine(the, worker->the, message, (modMessageDeliver)workerDeliverMarshall, worker); } void xs_worker_postfromworker(xsMachine *the) @@ -300,20 +301,21 @@ void xs_worker_postfromworker(xsMachine *the) xsUnknownError("worker closing"); message = xsMarshall(xsArg(0)); - if (modMessagePostToMachine(worker->parent, (uint8_t *)&message, sizeof(message), (modMessageDeliver)workerDeliverMarshall, worker)) - xsUnknownError("post from worker failed"); + doModMessagePostToMachine(the, worker->parent, message, (modMessageDeliver)workerDeliverMarshall, worker); } void xs_worker_close(xsMachine *the) { modWorker worker = xsmcGetHostDataValidate(xsThis, (void *)xs_emptyworker_destructor); - modMessagePostToMachine(worker->parent, NULL, 0, (modMessageDeliver)workerTerminate, worker); + doModMessagePostToMachine(the, worker->parent, NULL, (modMessageDeliver)workerTerminate, worker); } void workerDeliverMarshall(xsMachine *the, modWorker worker, uint8_t *message, uint16_t messageLength) { - if (worker->closing) + if (worker->closing) { + c_free(*(char **)message); return; + } xsBeginHost(the); @@ -419,6 +421,33 @@ void workerTerminate(xsMachine *the, modWorker worker, uint8_t *message, uint16_ xs_worker_destructor(worker); } +void doModMessagePostToMachine(xsMachine *the, xsMachine *targetThe, uint8_t *message, modMessageDeliver callback, void *refcon) +{ + char *error; + int result; + + if (message) { + result = modMessagePostToMachine(targetThe, (uint8_t *)&message, sizeof(message), callback, refcon); + if (!result) + return; + c_free(message); + } + else { + result = modMessagePostToMachine(targetThe, NULL, 0, callback, refcon); + if (!result) + return; + } + + if (-1 == result) + error = "no memory"; + else if (-2 == result) + error = "timeout"; + else + error = "unknown"; + + xsUnknownError(error); +} + #ifdef INC_FREERTOS_H void workerLoop(void *pvParameter) diff --git a/tests/modules/base/worker/messages.js b/tests/modules/base/worker/messages.js index 48e01a656..b4922be0c 100644 --- a/tests/modules/base/worker/messages.js +++ b/tests/modules/base/worker/messages.js @@ -28,6 +28,7 @@ const worker = new Worker("testworker", minimumOptions); assert.throws(SyntaxError, () => worker.postMessage(), "postMessage requires 1 argument"); assert.throws(SyntaxError, () => worker.postMessage.call(new $TESTMC.HostObject, 0, 64), "postMessage with non-worker this"); +assert.throws(TypeError, () => worker.postMessage({host: new $TESTMC.HostObject}), "postMessage rejects host objects"); let index = 0; worker.postMessage(messages[index]); @@ -52,10 +53,12 @@ worker.onmessage = function(reply) { } else if (7 === index) { assert(actual instanceof ArrayBuffer, "expected ArrayBuffer instance"); + assert.sameValue(actual.byteLength, 1, "expected ArrayBuffer.byteLength 1"); assert.sameValue((new Uint8Array(actual))[0], 1, "expected buffer[0] to be 1"); } else if (8 === index) { assert(actual instanceof Uint32Array, "expected Uint32Array instance"); + assert.sameValue(actual.length, 1, "expected Uint32Array.length 1"); assert.sameValue(actual[0], 1, "expected buffer[0] to be 1"); } else diff --git a/xs/platforms/esp/xsHost.c b/xs/platforms/esp/xsHost.c index bab574833..775550983 100644 --- a/xs/platforms/esp/xsHost.c +++ b/xs/platforms/esp/xsHost.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 Moddable Tech, Inc. + * Copyright (c) 2016-2023 Moddable Tech, Inc. * * This file is part of the Moddable SDK Runtime. * @@ -1058,6 +1058,14 @@ txU8 __atomic_fetch_xor_8(txU8 *ptr, txU8 val, int memorder) #if ESP32 +#ifndef MODDEF_TASK_QUEUEWAIT + #ifdef mxDebug + #define MODDEF_TASK_QUEUEWAIT (1000) + #else + #define MODDEF_TASK_QUEUEWAIT (portMAX_DELAY) + #endif +#endif + typedef struct modMessageRecord modMessageRecord; typedef modMessageRecord *modMessage; @@ -1078,7 +1086,7 @@ int modMessagePostToMachine(xsMachine *the, uint8_t *message, uint16_t messageLe msg.callback = callback; msg.refcon = refcon; msg.length = 0; - xQueueSendToFront(the->msgQueue, &msg, portMAX_DELAY); + xQueueSendToBack(the->dbgQueue, &msg, portMAX_DELAY); return 0; } #endif @@ -1094,19 +1102,14 @@ int modMessagePostToMachine(xsMachine *the, uint8_t *message, uint16_t messageLe msg.length = messageLength; msg.callback = callback; msg.refcon = refcon; -#ifdef mxDebug - do { - if (uxQueueSpacesAvailable(the->msgQueue) > 1) { // keep one entry free for debugger - xQueueSendToBack(the->msgQueue, &msg, portMAX_DELAY); - break; - } - vTaskDelay(5); - } while (1); -#else - xQueueSendToBack(the->msgQueue, &msg, portMAX_DELAY); -#endif - return 0; + if (pdTRUE == xQueueSendToBack(the->msgQueue, &msg, MODDEF_TASK_QUEUEWAIT)) + return 0; + + if (msg.message) + c_free(msg.message); + + return -2; } int modMessagePostToMachineFromISR(xsMachine *the, modMessageDeliver callback, void *refcon) @@ -1126,7 +1129,7 @@ int modMessagePostToMachineFromISR(xsMachine *the, modMessageDeliver callback, v void modMessageService(xsMachine *the, int maxDelayMS) { - unsigned portBASE_TYPE count = uxQueueMessagesWaiting(the->msgQueue); + modMessageRecord msg; #if CONFIG_ESP_TASK_WDT modWatchDogReset(); @@ -1139,47 +1142,84 @@ void modMessageService(xsMachine *the, int maxDelayMS) } #endif +#ifdef mxDebug while (true) { - modMessageRecord msg; + QueueSetMemberHandle_t queue = xQueueSelectFromSet(the->queues, maxDelayMS); + if (!queue) + break; - if (!xQueueReceive(the->msgQueue, &msg, maxDelayMS)) { - modWatchDogReset(); - return; - } + // xQueueSelectFromSet seems to require emptying the queue before it will generate another notification + while (true) { + if (!xQueueReceive(queue, &msg, 0)) + break; + + (msg.callback)(the, msg.refcon, msg.message, msg.length); + if (msg.message) + c_free(msg.message); + maxDelayMS = 0; + } + } +#else + while (xQueueReceive(the->msgQueue, &msg, maxDelayMS)) { (msg.callback)(the, msg.refcon, msg.message, msg.length); if (msg.message) c_free(msg.message); maxDelayMS = 0; - if (count <= 1) - break; - count -= 1; } +#endif + + modWatchDogReset(); } #ifndef modTaskGetCurrent #error make sure MOD_TASKS and modTaskGetCurrent are defined #endif +#ifndef MODDEF_TASK_QUEUELENGTH + #define MODDEF_TASK_QUEUELENGTH (10) +#endif + void modMachineTaskInit(xsMachine *the) { the->task = (void *)modTaskGetCurrent(); - the->msgQueue = xQueueCreate(10, sizeof(modMessageRecord)); + the->msgQueue = xQueueCreate(MODDEF_TASK_QUEUELENGTH, sizeof(modMessageRecord)); +#ifdef mxDebug + the->dbgQueue = xQueueCreate(4, sizeof(modMessageRecord)); + + the->queues = xQueueCreateSet(2); + xQueueAddToSet(the->msgQueue, the->queues); + xQueueAddToSet(the->dbgQueue, the->queues); +#endif } void modMachineTaskUninit(xsMachine *the) { - if (the->msgQueue) { - modMessageRecord msg; + modMessageRecord msg; + if (the->msgQueue) { while (xQueueReceive(the->msgQueue, &msg, 0)) { if (msg.message) c_free(msg.message); } +#ifdef mxDebug + xQueueRemoveFromSet(the->msgQueue, the->queues); +#endif vQueueDelete(the->msgQueue); } + +#ifdef mxDebug + if (the->dbgQueue) { + while (xQueueReceive(the->dbgQueue, &msg, 0)) + ; + xQueueRemoveFromSet(the->dbgQueue, the->queues); + vQueueDelete(the->dbgQueue); + } + if (the->queues) + vQueueDelete(the->queues); +#endif } void modMachineTaskWait(xsMachine *the) diff --git a/xs/platforms/esp/xsPlatform.h b/xs/platforms/esp/xsPlatform.h index 90eeb2f94..4bd9d0b01 100644 --- a/xs/platforms/esp/xsPlatform.h +++ b/xs/platforms/esp/xsPlatform.h @@ -183,6 +183,8 @@ typedef struct DebugFragmentRecord *DebugFragment; uint8_t *heap_ptr; \ uint8_t *heap_pend; \ void *msgQueue; \ + void *dbgQueue; \ + void *queues; \ void *task; \ void* waiterCondition; \ void* waiterData; \