diff --git a/.changeset/rude-chicken-raise.md b/.changeset/rude-chicken-raise.md new file mode 100644 index 000000000000..d1aa09c10387 --- /dev/null +++ b/.changeset/rude-chicken-raise.md @@ -0,0 +1,13 @@ +--- +"miniflare": patch +--- + +fix: ensure `Mutex` doesn't report itself as drained if locked + +Previously, Miniflare's `Mutex` implementation would report itself as drained +if there were no waiters, regardless of the locked state. This bug meant that +if you called but didn't `await` `Miniflare#setOptions()`, future calls to +`Miniflare#dispatchFetch()` (or any other asynchronous `Miniflare` method) +wouldn't wait for the options update to apply and the runtime to restart before +sending requests. This change ensures we wait until the mutex is unlocked before +reporting it as drained. diff --git a/packages/miniflare/src/index.ts b/packages/miniflare/src/index.ts index 574bac1b865d..d0712b225f4d 100644 --- a/packages/miniflare/src/index.ts +++ b/packages/miniflare/src/index.ts @@ -1160,10 +1160,9 @@ export class Miniflare { this.dispatchFetch ); } else { - // The `ProxyServer` "heap" will have been destroyed when `workerd` was - // restarted, invalidating all existing native target references. Mark - // all proxies as invalid, noting the new runtime URL to send requests to. - this.#proxyClient.poisonProxies(this.#runtimeEntryURL); + // Update the proxy client with the new runtime URL to send requests to. + // Existing proxies will already have been poisoned in `setOptions()`. + this.#proxyClient.setRuntimeEntryURL(this.#runtimeEntryURL); } if (!this.#runtimeMutex.hasWaiting) { @@ -1293,6 +1292,9 @@ export class Miniflare { setOptions(opts: MiniflareOptions): Promise { this.#checkDisposed(); + // The `ProxyServer` "heap" will be destroyed when `workerd` restarts, + // invalidating all existing native references. Mark all proxies as invalid. + this.#proxyClient?.poisonProxies(); // Wait for initial initialisation and other setOptions to complete before // changing options return this.#runtimeMutex.runWith(() => this.#setOptions(opts)); @@ -1496,6 +1498,11 @@ export class Miniflare { async dispose(): Promise { this.#disposeController.abort(); + // The `ProxyServer` "heap" will be destroyed when `workerd` shuts down, + // invalidating all existing native references. Mark all proxies as invalid. + // Note `dispose()`ing the `#proxyClient` implicitly poison's proxies, but + // we'd like them to be poisoned synchronously here. + this.#proxyClient?.poisonProxies(); try { await this.#waitForReady(/* disposing */ true); } finally { diff --git a/packages/miniflare/src/plugins/core/proxy/client.ts b/packages/miniflare/src/plugins/core/proxy/client.ts index ff50844697e8..6a5a2dfb072a 100644 --- a/packages/miniflare/src/plugins/core/proxy/client.ts +++ b/packages/miniflare/src/plugins/core/proxy/client.ts @@ -81,13 +81,19 @@ export class ProxyClient { return (this.#envProxy ??= this.#bridge.getProxy(TARGET_ENV)); } - poisonProxies(runtimeEntryURL?: URL): void { - this.#bridge.poisonProxies(runtimeEntryURL); + poisonProxies(): void { + this.#bridge.poisonProxies(); // Reset `#{global,env}Proxy` so they aren't poisoned on next access this.#globalProxy = undefined; this.#envProxy = undefined; } + setRuntimeEntryURL(runtimeEntryURL: URL) { + // This function will be called whenever the runtime restarts. The URL may + // be different if the port has changed. + this.#bridge.url = runtimeEntryURL; + } + dispose(): Promise { // Intentionally not resetting `#{global,env}Proxy` to keep them poisoned. // `workerd` won't be started again by this `Miniflare` instance after @@ -188,13 +194,12 @@ class ProxyClientBridge { return proxy; } - poisonProxies(url?: URL): void { + poisonProxies(): void { this.#version++; - // This function will be called whenever the runtime restarts. The URL may - // be different if the port has changed. We must also unregister all - // finalizers as the heap will be reset, and we don't want a new object - // added with the same address to be freed when it's still accessible. - if (url !== undefined) this.url = url; + // This function will be called whenever `setOptions()` or `dispose()` is + // called. We must also unregister all finalizers as the heap will be reset, + // and we don't want a new object added with the same address to be freed + // when it's still accessible. this.#finalizationRegistry.unregister(this); } @@ -399,6 +404,8 @@ class ProxyStubHandler implements ProxyHandler { } getOwnPropertyDescriptor(target: T, key: string | symbol) { + this.#assertSafe(); + if (typeof key === "symbol") return undefined; // Optimisation: assume constant prototypes of proxied objects, descriptors @@ -424,6 +431,8 @@ class ProxyStubHandler implements ProxyHandler { } ownKeys(_target: T) { + this.#assertSafe(); + // Optimisation: assume constant prototypes of proxied objects, own keys // should never change after we've fetched them if (this.#knownOwnKeys !== undefined) return this.#knownOwnKeys; @@ -442,6 +451,8 @@ class ProxyStubHandler implements ProxyHandler { } getPrototypeOf(_target: T) { + this.#assertSafe(); + // Return a `null` prototype, so users know this isn't a plain object return null; } diff --git a/packages/miniflare/src/workers/shared/sync.ts b/packages/miniflare/src/workers/shared/sync.ts index c5836331ce30..4dfc026ea892 100644 --- a/packages/miniflare/src/workers/shared/sync.ts +++ b/packages/miniflare/src/workers/shared/sync.ts @@ -71,7 +71,7 @@ export class Mutex { } async drained(): Promise { - if (this.resolveQueue.length === 0) return; + if (this.resolveQueue.length === 0 && !this.locked) return; return new Promise((resolve) => this.drainQueue.push(resolve)); } } diff --git a/packages/miniflare/test/shared/sync.spec.ts b/packages/miniflare/test/shared/sync.spec.ts index e37d5fead988..9c4260075855 100644 --- a/packages/miniflare/test/shared/sync.spec.ts +++ b/packages/miniflare/test/shared/sync.spec.ts @@ -46,6 +46,7 @@ test("Mutex: maintains separate drain queue", async (t) => { void mutex.runWith(() => deferred1); let drained = false; mutex.drained().then(() => (drained = true)); + await setTimeout(); t.false(drained); deferred1.resolve(); await setTimeout(); @@ -64,6 +65,7 @@ test("Mutex: maintains separate drain queue", async (t) => { }); drained = false; mutex.drained().then(() => (drained = true)); + await setTimeout(); t.false(drained); deferred2.resolve(); await setTimeout();