diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 3cfb92a3f93..e16a66ddbd1 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -452,31 +452,73 @@ public: // We're on a request-serving thread. auto& ioContext = IoContext::current(); timePoint = ioContext.now(); - } else KJ_IF_MAYBE(info, inspectorTimerInfo) { - if (info->threadId == getCurrentThreadId()) { - // We're on an inspector-serving thread. + } else { + auto lockedState = state.lockExclusive(); + KJ_IF_MAYBE(info, lockedState->inspectorTimerInfo) { timePoint = info->timer.now() + info->timerOffset - kj::origin() + kj::UNIX_EPOCH; + } else { + // We're at script startup time -- just return the Epoch. } } + return (timePoint - kj::UNIX_EPOCH) / kj::MILLISECONDS; + } - // If we're on neither a request- nor inspector-serving thread, then we're at script startup - // time -- just return the Epoch. + void setInspectorTimerInfo(kj::Timer& timer, kj::Duration timerOffset) { + auto lockedState = state.lockExclusive(); + lockedState->inspectorTimerInfo = InspectorTimerInfo { timer, timerOffset, getCurrentThreadId() }; + } - return (timePoint - kj::UNIX_EPOCH) / kj::MILLISECONDS; + void setChannel(Worker::Isolate::InspectorChannelImpl& channel) { + auto lockedState = state.lockExclusive(); + lockedState->channel = channel; + } + + void resetChannel() { + auto lockedState = state.lockExclusive(); + lockedState->channel = {}; } - // Nothing else. We ignore everything the inspector tells us, because we only care about the - // devtools inspector protocol, which is handled separately. + void runMessageLoopOnPause(int contextGroupId) override { + // This method is called by v8 when a breakpoint or debugger statement is hit. This method + // processes debugger messages until `Debugger.resume()` is called, when v8 then calls + // `quitMessageLoopOnPause()`. + // + //NB This method is ultimately called from the `InspectorChannelImpl` and the isolate lock is + // held when this method is called. + if (isMultiTenantProcess()) { + // TODO: breakpoints are not supported for the multi-tenant case. + return; + } - void setInspectorTimerInfo(kj::Timer& timer, kj::Duration timerOffset) { - // Helper for attachInspector(). - inspectorTimerInfo = InspectorTimerInfo { timer, timerOffset, getCurrentThreadId() }; + runMessageLoop = true; + do { + auto lockedState = state.lockExclusive(); + // Processing each message with a lock/release cycle is expensive, but + // we are debugging and processing a low volume of messages. Dropping the + // lock allows us to adapt behaviour if the lock is lost. + KJ_IF_MAYBE(channel, lockedState->channel) { + if (!dispatchOneMessageDuringPause(*channel)) { + break; + } + } else { + // Our channel is missing or has gone down. Stop waiting for messages + // and resume execution. + break; + } + } while (runMessageLoop); + } + + void quitMessageLoopOnPause() override { + // This method is called by v8 to resume execution after a breakpoint is hit. + runMessageLoop = false; } const kj::Executor& getExecutor() const { return executor; } private: + static bool dispatchOneMessageDuringPause(Worker::Isolate::InspectorChannelImpl& channel); + struct InspectorTimerInfo { kj::Timer& timer; kj::Duration timerOffset; @@ -484,9 +526,19 @@ private: }; const kj::Executor& executor; + volatile bool runMessageLoop; - kj::Maybe inspectorTimerInfo; - // The timer and offset for the inspector-serving thread. + struct State { + // State that may be set on a thread other than the isolate thread. + // These are typically set in attachInspector when an inspector connection is + // made. + kj::Maybe channel; + // Inspector channel to use to pump messages. + + kj::Maybe inspectorTimerInfo; + // The timer and offset for the inspector-serving thread. + }; + kj::MutexGuarded state; }; void setWebAssemblyModuleHasInstance(jsg::Lock& lock, v8::Local context); @@ -1583,6 +1635,7 @@ Worker::Lock::Lock(const Worker& constWorker, LockType lockType) impl(kj::heap(worker, lockType, stackScope)) { kj::requireOnStack(this, "Worker::Lock MUST be allocated on the stack."); } + Worker::Lock::~Lock() noexcept(false) { // const_cast OK because we hold -- nay, we *are* -- a lock on the script. auto& isolate = const_cast(worker.getIsolate()); @@ -2012,9 +2065,7 @@ public: jsg::V8StackScope stackScope; Isolate::Impl::Lock recordedLock(*state->get()->isolate, InspectorLock(nullptr), stackScope); KJ_IF_MAYBE(p, state->get()->isolate->currentInspectorSession) { - if (p == this) { - const_cast(*state->get()->isolate).currentInspectorSession = nullptr;; - } + const_cast(*state->get()->isolate).disconnectInspector(); } state->get()->teardownUnderLock(); } catch (...) { @@ -2046,10 +2097,11 @@ public: } void dispatchProtocolMessages() { + auto lockedState = this->state.lockExclusive(); for (;;) { auto maybeMessage = ioWorker.pollForIncomingMessage(); KJ_IF_MAYBE(message, maybeMessage) { - dispatchProtocolMessage(kj::mv(*message)); + dispatchProtocolMessage(lockedState, kj::mv(*message)); } else { return; } @@ -2092,145 +2144,9 @@ public: // delay signaling the outgoing loop until this call? } -private: - void dispatchProtocolMessage(kj::String message) { - capnp::MallocMessageBuilder messageBuilder; - auto cmd = messageBuilder.initRoot(); - getCdpJsonCodec().decode(message, cmd); + bool dispatchOneMessageDuringPause(); - switch (cmd.which()) { - case cdp::Command::UNKNOWN: { - break; - } - case cdp::Command::NETWORK_ENABLE: { - setNetworkEnabled(true); - cmd.getNetworkEnable().initResult(); - break; - } - case cdp::Command::NETWORK_DISABLE: { - setNetworkEnabled(false); - cmd.getNetworkDisable().initResult(); - break; - } - case cdp::Command::NETWORK_GET_RESPONSE_BODY: { - auto err = cmd.getNetworkGetResponseBody().initError(); - err.setCode(-32600); - err.setMessage("Network.getResponseBody is not supported in this fork"); - break; - } - case cdp::Command::PROFILER_STOP: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - stopProfiling(**p, lock.v8Isolate, cmd); - } - break; - } - case cdp::Command::PROFILER_START: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - startProfiling(**p, lock.v8Isolate); - } - break; - } - case cdp::Command::PROFILER_SET_SAMPLING_INTERVAL: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto interval = cmd.getProfilerSetSamplingInterval().getParams().getInterval(); - setSamplingInterval(**p, interval); - } - break; - } - case cdp::Command::PROFILER_ENABLE: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - isolate.impl->profiler = kj::Own( - v8::CpuProfiler::New(lock.v8Isolate, v8::kDebugNaming, v8::kLazyLogging), - CpuProfilerDisposer::instance); - break; - } - case cdp::Command::TAKE_HEAP_SNAPSHOT: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - auto params = cmd.getTakeHeapSnapshot().getParams(); - takeHeapSnapshot(lock, - params.getExposeInternals(), - params.getCaptureNumericValue()); - break; - } - } - - if (!cmd.isUnknown()) { - sendNotification(cmd); - return; - } - - auto state = this->state.lockExclusive(); - - // const_cast OK because we're going to lock it - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - - // We have at times observed V8 bugs where the inspector queues a background task and - // then synchronously waits for it to complete, which would deadlock if background - // threads are disallowed. Since the inspector is in a process sandbox anyway, it's not - // a big deal to just permit those background threads. - AllowV8BackgroundThreadsScope allowBackgroundThreads; - - kj::Maybe maybeLimitError; - { - auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(lock, maybeLimitError); - state->get()->session->dispatchProtocolMessage(toStringView(message)); - } - - // Run microtasks in case the user made an async call. - if (maybeLimitError == nullptr) { - auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(lock, maybeLimitError); - lock.v8Isolate->PerformMicrotaskCheckpoint(); - } else { - // Oops, we already exceeded the limit, so force the microtask queue to be thrown away. - lock.v8Isolate->TerminateExecution(); - lock.v8Isolate->PerformMicrotaskCheckpoint(); - } - - KJ_IF_MAYBE(limitError, maybeLimitError) { - v8::HandleScope scope(lock.v8Isolate); - - // HACK: We want to print the error, but we need a context to do that. - // We don't know which contexts exist in this isolate, so I guess we have to - // create one. Ugh. - auto dummyContext = v8::Context::New(lock.v8Isolate); - auto& inspector = *KJ_ASSERT_NONNULL(isolate.impl->inspector); - inspector.contextCreated( - v8_inspector::V8ContextInfo(dummyContext, 1, v8_inspector::StringView( - reinterpret_cast("Worker"), 6))); - sendExceptionToInspector(inspector, dummyContext, - jsg::extractTunneledExceptionDescription(limitError->getDescription())); - inspector.contextDestroyed(dummyContext); - } - - if (recordedLock.checkInWithLimitEnforcer(isolate)) { - disconnect(); - } - } +private: class WebSocketIoWorker final { // Class that manages the I/O for devtools connections. I/O is performed on the // thread associated with the InspectorService (the thread that calls attachInspector). @@ -2257,6 +2173,7 @@ private: void disconnect() { channel = {}; + shutdown(); } bool isClosed() const { @@ -2264,6 +2181,7 @@ private: } kj::Maybe pollForIncomingMessage() { + if (isClosed()) return {}; auto lockedIncomingQueue = incomingQueue.lockExclusive(); return pollMessage(*lockedIncomingQueue); } @@ -2287,16 +2205,14 @@ private: kj::Promise messagePump() { // Message pumping promise that should be evaluated on the InspectorService // thread. - return incomingLoop() - .exclusiveJoin(outgoingLoop()); + return incomingLoop().exclusiveJoin(outgoingLoop()); } void send(kj::String message) { + if (isClosed()) return; auto lockedOutgoingQueue = outgoingQueue.lockExclusive(); - if (!isClosed()) { - lockedOutgoingQueue->messages.add(kj::mv(message)); - outgoingQueueNotifier->notify(); - } + lockedOutgoingQueue->messages.add(kj::mv(message)); + outgoingQueueNotifier->notify(); } private: @@ -2323,6 +2239,20 @@ private: } } + void shutdown() { + receivedClose.store(true, std::memory_order_release); + + // Wake any waiters. + outgoingQueueNotifier->notify(); + + // Drain incoming queue, the isolate thread may be waiting on it + // on will notice it is closed if woken without any messages to + // deliver in WebSocketIoWorker::waitForMessage(). + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + lockedIncomingQueue->head = 0; + lockedIncomingQueue->messages.clear(); + } + kj::Promise incomingLoop() { return webSocket.receive().then([this](kj::WebSocket::Message&& message) -> kj::Promise { KJ_SWITCH_ONEOF(message) { @@ -2341,8 +2271,7 @@ private: // Ignore. } KJ_CASE_ONEOF(close, kj::WebSocket::Close) { - receivedClose.store(true, std::memory_order_release); - outgoingQueueNotifier->notify(); + shutdown(); } } return incomingLoop(); @@ -2353,13 +2282,18 @@ private: return outgoingQueueNotifier->awaitNotification().then([this]() { auto lockedQueue = outgoingQueue.lockExclusive(); auto messages = kj::mv(lockedQueue->messages); - kj::Promise sendMessages = sendToWebSocket(messages).attach(kj::mv(messages)); - if (isClosed()) { - return sendMessages.then([this]() { - return webSocket.close(1000, "client closed connection"); - }); + try { + kj::Promise sendMessages = sendToWebSocket(messages).attach(kj::mv(messages)); + if (isClosed()) { + return sendMessages.then([this]() { + return webSocket.close(1000, "client closed connection"); + }); + } + return sendMessages.then([this]() { return outgoingLoop(); }); + } catch (kj::Exception& e) { + shutdown(); + throw; } - return sendMessages.then([this]() { return outgoingLoop(); }); }); } @@ -2518,8 +2452,166 @@ private: volatile bool networkEnabled = false; // Not under `state` lock due to lock ordering complications. + + void dispatchProtocolMessage( + kj::Locked>& lockedState, + kj::String message) { + v8_inspector::V8InspectorSession& session = *lockedState->get()->session; + // const_cast OK because we're going to lock it + Isolate& isolate = const_cast(*lockedState->get()->isolate); + jsg::V8StackScope stackScope; + Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); + dispatchProtocolMessage(kj::mv(message), session, isolate, stackScope, recordedLock); + } + + void dispatchProtocolMessage(kj::String message, + v8_inspector::V8InspectorSession& session, + Isolate& isolate, + jsg::V8StackScope& stackScope, + Isolate::Impl::Lock& recordedLock) { + capnp::MallocMessageBuilder messageBuilder; + auto cmd = messageBuilder.initRoot(); + getCdpJsonCodec().decode(message, cmd); + + switch (cmd.which()) { + case cdp::Command::UNKNOWN: { + break; + } + case cdp::Command::NETWORK_ENABLE: { + setNetworkEnabled(true); + cmd.getNetworkEnable().initResult(); + break; + } + case cdp::Command::NETWORK_DISABLE: { + setNetworkEnabled(false); + cmd.getNetworkDisable().initResult(); + break; + } + case cdp::Command::NETWORK_GET_RESPONSE_BODY: { + auto err = cmd.getNetworkGetResponseBody().initError(); + err.setCode(-32600); + err.setMessage("Network.getResponseBody is not supported in this fork"); + break; + } + case cdp::Command::PROFILER_STOP: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto& lock = recordedLock.lock; + stopProfiling(**p, lock->v8Isolate, cmd); + } + break; + } + case cdp::Command::PROFILER_START: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto& lock = recordedLock.lock; + startProfiling(**p, lock->v8Isolate); + } + break; + } + case cdp::Command::PROFILER_SET_SAMPLING_INTERVAL: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto interval = cmd.getProfilerSetSamplingInterval().getParams().getInterval(); + setSamplingInterval(**p, interval); + } + break; + } + case cdp::Command::PROFILER_ENABLE: { + auto& lock = recordedLock.lock; + isolate.impl->profiler = kj::Own( + v8::CpuProfiler::New(lock->v8Isolate, v8::kDebugNaming, v8::kLazyLogging), + CpuProfilerDisposer::instance); + break; + } + case cdp::Command::TAKE_HEAP_SNAPSHOT: { + auto& lock = recordedLock.lock; + auto params = cmd.getTakeHeapSnapshot().getParams(); + takeHeapSnapshot(*lock, + params.getExposeInternals(), + params.getCaptureNumericValue()); + break; + } + } + + if (!cmd.isUnknown()) { + sendNotification(cmd); + return; + } + + auto& lock = recordedLock.lock; + + // We have at times observed V8 bugs where the inspector queues a background task and + // then synchronously waits for it to complete, which would deadlock if background + // threads are disallowed. Since the inspector is in a process sandbox anyway, it's not + // a big deal to just permit those background threads. + AllowV8BackgroundThreadsScope allowBackgroundThreads; + + kj::Maybe maybeLimitError; + { + auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(*lock, maybeLimitError); + session.dispatchProtocolMessage(toStringView(message)); + } + + // Run microtasks in case the user made an async call. + if (maybeLimitError == nullptr) { + auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(*lock, maybeLimitError); + lock->v8Isolate->PerformMicrotaskCheckpoint(); + } else { + // Oops, we already exceeded the limit, so force the microtask queue to be thrown away. + lock->v8Isolate->TerminateExecution(); + lock->v8Isolate->PerformMicrotaskCheckpoint(); + } + + KJ_IF_MAYBE(limitError, maybeLimitError) { + v8::HandleScope scope(lock->v8Isolate); + + // HACK: We want to print the error, but we need a context to do that. + // We don't know which contexts exist in this isolate, so I guess we have to + // create one. Ugh. + auto dummyContext = v8::Context::New(lock->v8Isolate); + auto& inspector = *KJ_ASSERT_NONNULL(isolate.impl->inspector); + inspector.contextCreated( + v8_inspector::V8ContextInfo(dummyContext, 1, v8_inspector::StringView( + reinterpret_cast("Worker"), 6))); + sendExceptionToInspector(inspector, dummyContext, + jsg::extractTunneledExceptionDescription(limitError->getDescription())); + inspector.contextDestroyed(dummyContext); + } + + if (recordedLock.checkInWithLimitEnforcer(isolate)) { + disconnect(); + } + } }; +bool Worker::Isolate::InspectorChannelImpl::dispatchOneMessageDuringPause() { + auto maybeMessage = ioWorker.waitForMessage(); + + // We can be paused by either hitting a debugger statement in a script or from hitting + // a breakpoint or someone hit break. As a result we may or may not be recursive dispatching messages. + + KJ_IF_MAYBE(message, maybeMessage) { + auto lockedState = this->state.lockExclusive(); + if (IoContext::hasCurrent()) { + // Received a message whilst script is running, probably in a breakpoint. + v8_inspector::V8InspectorSession& session = *lockedState->get()->session; + // const_cast OK because we're going to lock it + Isolate& isolate = const_cast(*lockedState->get()->isolate); + Worker::Lock& workerLock = IoContext::current().getCurrentLock(); + Isolate::Impl::Lock& recordedLock = workerLock.impl->recordedLock; + jsg::V8StackScope stackScope; + dispatchProtocolMessage(kj::mv(*message), session, isolate, stackScope, recordedLock); + } else { + // Received a message whilst idle. + dispatchProtocolMessage(lockedState, kj::mv(*message)); + } + return true; + } + return false; +} + +bool Worker::InspectorClient::dispatchOneMessageDuringPause(Worker::Isolate::InspectorChannelImpl& channel) { + return channel.dispatchOneMessageDuringPause(); +} + kj::Promise Worker::Isolate::attachInspector( kj::Timer& timer, kj::Duration timerOffset, @@ -2542,12 +2634,6 @@ kj::Promise Worker::Isolate::attachInspector( kj::WebSocket& webSocket) const { KJ_REQUIRE(impl->inspector != nullptr); - // This method can be called upon first connection, if a connection drops, whilst the - // isolate is stopped at a breakpoint, or anytime an inbound inspector connection comes in. - // The inspector service is running on a different thread - // so it can create an IO helper that works if the isolate is stopped at a breakpoint (so it - // can receive messages to advise it how to proceed, e.g. resume). - jsg::V8StackScope stackScope; Isolate::Impl::Lock recordedLock(*this, InspectorChannelImpl::InspectorLock(nullptr), stackScope); auto& lock = *recordedLock.lock; @@ -2566,6 +2652,7 @@ kj::Promise Worker::Isolate::attachInspector( auto channel = kj::heap( kj::atomicAddRef(*this), webSocket, isolateExecutor); lockedSelf.currentInspectorSession = *channel; + lockedSelf.impl->inspectorClient.setChannel(*channel); // Send any queued notifications. { @@ -2575,18 +2662,17 @@ kj::Promise Worker::Isolate::attachInspector( } lockedSelf.impl->queuedNotifications.clear(); } - - // TODO(oth): Fix me, expect this to break. return channel->messagePump().attach(kj::mv(channel)); } void Worker::Isolate::disconnectInspector() { // If an inspector session is connected, proactively drop it, so as to force it to drop its // reference on the script, so that the script can be deleted. - KJ_IF_MAYBE(current, currentInspectorSession) { current->disconnect(); + currentInspectorSession = {}; } + impl->inspectorClient.resetChannel(); } void Worker::Isolate::logWarning(kj::StringPtr description, Lock& lock) { diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index 008d48cadae..b670db2e75f 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -450,7 +450,6 @@ class Worker::Isolate: public kj::AtomicRefcounted { kj::Own traceAsyncContextKey; friend class Worker; - friend class IsolateChannelImpl; }; class Worker::ApiIsolate {