Skip to content

Commit

Permalink
Merge pull request #378 from cloudflare/jsnell/more-streams-cleanup-f…
Browse files Browse the repository at this point in the history
…ollowup
  • Loading branch information
jasnell authored Feb 19, 2023
2 parents 0ee3f8b + 7e3c9fb commit e497feb
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 81 deletions.
171 changes: 103 additions & 68 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,8 @@ ReadableStreamDefaultController::ReadableStreamDefaultController(
UnderlyingSource underlyingSource,
StreamQueuingStrategy queuingStrategy)
: ioContext(tryGetIoContext()),
impl(kj::mv(underlyingSource), kj::mv(queuingStrategy)) {}
impl(kj::mv(underlyingSource), kj::mv(queuingStrategy)),
weakRef(kj::refcounted<WeakRef<ReadableStreamDefaultController>>(*this)) {}

void ReadableStreamDefaultController::start(jsg::Lock& js) {
impl.start(js, JSG_THIS);
Expand Down Expand Up @@ -2748,13 +2749,16 @@ jsg::Promise<void> WritableStreamDefaultController::write(

// ======================================================================================
WritableStreamJsController::WritableStreamJsController()
: ioContext(tryGetIoContext()) {}
: ioContext(tryGetIoContext()),
weakRef(kj::refcounted<WeakRef<WritableStreamJsController>>(*this)) {}

WritableStreamJsController::WritableStreamJsController(StreamStates::Closed closed)
: ioContext(tryGetIoContext()), state(closed) {}
: ioContext(tryGetIoContext()), state(closed),
weakRef(kj::refcounted<WeakRef<WritableStreamJsController>>(*this)) {}

WritableStreamJsController::WritableStreamJsController(StreamStates::Errored errored)
: ioContext(tryGetIoContext()), state(kj::mv(errored)) {}
: ioContext(tryGetIoContext()), state(kj::mv(errored)),
weakRef(kj::refcounted<WeakRef<WritableStreamJsController>>(*this)) {}

jsg::Promise<void> WritableStreamJsController::abort(
jsg::Lock& js,
Expand Down Expand Up @@ -3145,13 +3149,18 @@ TransformStreamDefaultController::TransformStreamDefaultController(jsg::Lock& js
startPromise(js.newPromiseAndResolver<void>()) {}

kj::Maybe<int> TransformStreamDefaultController::getDesiredSize() {
return getReadableController().getDesiredSize();
KJ_IF_MAYBE(readableController, tryGetReadableController()) {
return readableController->getDesiredSize();
}
return nullptr;
}

void TransformStreamDefaultController::enqueue(
jsg::Lock& js,
v8::Local<v8::Value> chunk) {
auto& readableController = getReadableController();
auto& readableController = JSG_REQUIRE_NONNULL(
tryGetReadableController(), TypeError,
"The readable side of this TransformStream is no longer readable.");
JSG_REQUIRE(readableController.canCloseOrEnqueue(), TypeError,
"The readable side of this TransformStream is no longer readable.");
js.tryCatch([&] {
Expand All @@ -3168,46 +3177,49 @@ void TransformStreamDefaultController::enqueue(
}

void TransformStreamDefaultController::error(jsg::Lock& js, v8::Local<v8::Value> reason) {
errorNoIoContextCheck(js, reason);
}

void TransformStreamDefaultController::errorNoIoContextCheck(
jsg::Lock& js,
v8::Local<v8::Value> reason) {
getReadableController().error(js, reason);
KJ_IF_MAYBE(readableController, tryGetReadableController()) {
readableController->error(js, reason);
maybeReadableController = nullptr;
}
errorWritableAndUnblockWrite(js, reason);
}

void TransformStreamDefaultController::terminate(jsg::Lock& js) {
getReadableController().close(js);
KJ_IF_MAYBE(readableController, tryGetReadableController()) {
readableController->close(js);
maybeReadableController = nullptr;
}
errorWritableAndUnblockWrite(js, js.v8TypeError("The transform stream has been terminated"_kj));
}

jsg::Promise<void> TransformStreamDefaultController::write(
jsg::Lock& js,
v8::Local<v8::Value> chunk) {
auto& writableController = getWritableController();

KJ_IF_MAYBE(error, writableController.isErroredOrErroring(js)) {
return js.rejectedPromise<void>(*error);
}
KJ_IF_MAYBE(writableController, tryGetWritableController()) {
KJ_IF_MAYBE(error, writableController->isErroredOrErroring(js)) {
return js.rejectedPromise<void>(*error);
}

KJ_ASSERT(writableController.isWritable());
KJ_ASSERT(writableController->isWritable());

if (backpressure) {
auto chunkRef = js.v8Ref(chunk);
return KJ_ASSERT_NONNULL(maybeBackpressureChange).promise.whenResolved().then(js,
JSG_VISITABLE_LAMBDA((this, chunkRef = kj::mv(chunkRef), self = JSG_THIS),
(chunkRef, self), (jsg::Lock& js) {
auto& writableController = getWritableController();
KJ_IF_MAYBE(error, writableController.isErroring(js)) {
return js.rejectedPromise<void>(*error);
}
KJ_ASSERT(writableController.isWritable());
return performTransform(js, chunkRef.getHandle(js));
}));
if (backpressure) {
auto chunkRef = js.v8Ref(chunk);
return KJ_ASSERT_NONNULL(maybeBackpressureChange).promise.whenResolved().then(js,
JSG_VISITABLE_LAMBDA((chunkRef = kj::mv(chunkRef), ref=JSG_THIS),
(chunkRef, ref), (jsg::Lock& js) mutable -> jsg::Promise<void> {
KJ_IF_MAYBE(writableController, ref->tryGetWritableController()) {
KJ_IF_MAYBE(error, writableController->isErroring(js)) {
return js.rejectedPromise<void>(*error);
}
}
return ref->performTransform(js, chunkRef.getHandle(js));
}));
}
return performTransform(js, chunk);
} else {
return js.rejectedPromise<void>(KJ_EXCEPTION(FAILED,
"jsg.TypeError: Writing to the TransformStream failed."));
}
return performTransform(js, chunk);
}

jsg::Promise<void> TransformStreamDefaultController::abort(
Expand All @@ -3218,27 +3230,25 @@ jsg::Promise<void> TransformStreamDefaultController::abort(
}

jsg::Promise<void> TransformStreamDefaultController::close(jsg::Lock& js) {
auto onSuccess = [this, ref=JSG_THIS](jsg::Lock& js) -> jsg::Promise<void> {
auto& readableController = getReadableController();

// Allows for a graceful close of the readable side. Close will
// complete once all of the queued data is read or the stream
// errors.
readableController.close(js);
auto onSuccess = JSG_VISITABLE_LAMBDA(
(ref=JSG_THIS), (ref), (jsg::Lock& js) -> jsg::Promise<void> {
KJ_IF_MAYBE(readableController, ref->tryGetReadableController()) {
// Allows for a graceful close of the readable side. Close will
// complete once all of the queued data is read or the stream
// errors.
readableController->close(js);
}
return js.resolvedPromise();
};
});

auto onFailure = [this, ref=JSG_THIS](jsg::Lock& js, jsg::Value reason) -> jsg::Promise<void> {
error(js, reason.getHandle(js));
auto onFailure = JSG_VISITABLE_LAMBDA(
(ref=JSG_THIS),(ref),(jsg::Lock& js, jsg::Value reason) -> jsg::Promise<void> {
ref->error(js, reason.getHandle(js));
return js.rejectedPromise<void>(kj::mv(reason));
};
});

return jscontroller::maybeRunAlgorithm(
js,
algorithms.flush,
kj::mv(onSuccess),
kj::mv(onFailure),
JSG_THIS);
js, algorithms.flush, kj::mv(onSuccess), kj::mv(onFailure), JSG_THIS);
}

jsg::Promise<void> TransformStreamDefaultController::pull(jsg::Lock& js) {
Expand All @@ -3250,6 +3260,7 @@ jsg::Promise<void> TransformStreamDefaultController::pull(jsg::Lock& js) {
jsg::Promise<void> TransformStreamDefaultController::cancel(
jsg::Lock& js,
v8::Local<v8::Value> reason) {
maybeReadableController = nullptr;
errorWritableAndUnblockWrite(js, reason);
return js.resolvedPromise();
}
Expand All @@ -3264,17 +3275,22 @@ jsg::Promise<void> TransformStreamDefaultController::performTransform(
[](jsg::Lock& js) -> jsg::Promise<void> {
return js.resolvedPromise();
},
[this, ref=JSG_THIS](jsg::Lock& js, jsg::Value reason) -> jsg::Promise<void> {
error(js, reason.getHandle(js));
JSG_VISITABLE_LAMBDA(
(ref=JSG_THIS),(ref),(jsg::Lock& js, jsg::Value reason) -> jsg::Promise<void> {
ref->error(js, reason.getHandle(js));
return js.rejectedPromise<void>(kj::mv(reason));
},
}),
chunk,
JSG_THIS);
}
// If we got here, there is no transform algorithm. Per the spec, the default
// behavior then is to just pass along the value untransformed.
enqueue(js, chunk);
return js.resolvedPromise();
return js.tryCatch([&] {
enqueue(js, chunk);
return js.resolvedPromise();
}, [&](jsg::Value exception) {
return js.rejectedPromise<void>(kj::mv(exception));
});
}

void TransformStreamDefaultController::setBackpressure(jsg::Lock& js, bool newBackpressure) {
Expand All @@ -3291,9 +3307,11 @@ void TransformStreamDefaultController::errorWritableAndUnblockWrite(
jsg::Lock& js,
v8::Local<v8::Value> reason) {
algorithms.clear();
auto& writableController = getWritableController();
if (writableController.isWritable()) {
writableController.doError(js, reason);
KJ_IF_MAYBE(writableController, tryGetWritableController()) {
if (writableController->isWritable()) {
writableController->doError(js, reason);
}
maybeWritableController = nullptr;
}
if (backpressure) {
setBackpressure(js, false);
Expand All @@ -3314,16 +3332,17 @@ void TransformStreamDefaultController::init(
jsg::Optional<Transformer> maybeTransformer) {
KJ_ASSERT(maybeReadableController == nullptr);
KJ_ASSERT(maybeWritableController == nullptr);
maybeWritableController = static_cast<WritableStreamJsController&>(writable->getController());
maybeWritableController =
static_cast<WritableStreamJsController&>(writable->getController()).getWeakRef();

// The TransformStreamDefaultController needs to have a reference to the underlying controller
// and not just the readable because if the readable is teed, or passed off to source, etc,
// the TransformStream has to make sure that it can continue to interface with the controller
// to push data into it.
auto& readableController = static_cast<ReadableStreamJsController&>(readable->getController());
auto readableRef = KJ_ASSERT_NONNULL(readableController.getController());
maybeReadableController = kj::mv(KJ_ASSERT_NONNULL(
readableRef.tryGet<jsg::Ref<ReadableStreamDefaultController>>()));
maybeReadableController = KJ_ASSERT_NONNULL(
readableRef.tryGet<jsg::Ref<ReadableStreamDefaultController>>())->getWeakRef();

auto transformer = kj::mv(maybeTransformer).orDefault({});

Expand All @@ -3348,15 +3367,31 @@ void TransformStreamDefaultController::init(
algorithms.starting = jscontroller::maybeRunAlgorithm(
js,
transformer.start,
[this](jsg::Lock& js) {
algorithms.starting = nullptr;
startPromise.resolver.resolve();
},
[this](jsg::Lock& js, jsg::Value reason) {
algorithms.starting = nullptr;
startPromise.resolver.reject(reason.getHandle(js));
},
JSG_VISITABLE_LAMBDA((ref=JSG_THIS), (ref), (jsg::Lock& js) {
ref->algorithms.starting = nullptr;
ref->startPromise.resolver.resolve();
}),
JSG_VISITABLE_LAMBDA((ref=JSG_THIS), (ref), (jsg::Lock& js, jsg::Value reason) {
ref->algorithms.starting = nullptr;
ref->startPromise.resolver.reject(reason.getHandle(js));
}),
JSG_THIS);
}

kj::Maybe<ReadableStreamDefaultController&>
TransformStreamDefaultController::tryGetReadableController() {
KJ_IF_MAYBE(controller, maybeReadableController) {
return (*controller)->tryGet();
}
return nullptr;
}

kj::Maybe<WritableStreamJsController&>
TransformStreamDefaultController::tryGetWritableController() {
KJ_IF_MAYBE(controller, maybeWritableController) {
return (*controller)->tryGet();
}
return nullptr;
}

} // namespace workerd::api
Loading

0 comments on commit e497feb

Please sign in to comment.