Skip to content

Commit

Permalink
auto uncork
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner committed Oct 16, 2024
1 parent ef4435d commit cd05556
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 23 deletions.
14 changes: 14 additions & 0 deletions packages/bun-uws/src/Loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "LoopData.h"
#include <libusockets.h>
#include <iostream>
#include "AsyncSocket.h"

extern "C" int bun_is_exiting();

Expand Down Expand Up @@ -52,6 +53,15 @@ struct Loop {
for (auto &p : loopData->preHandlers) {
p.second((Loop *) loop);
}

void *corkedSocket = loopData->getCorkedSocket();
if (corkedSocket) {
if (loopData->isCorkedSSL()) {
((uWS::AsyncSocket<true> *) corkedSocket)->uncork();
} else {
((uWS::AsyncSocket<false> *) corkedSocket)->uncork();
}
}
}

static void postCb(us_loop_t *loop) {
Expand Down Expand Up @@ -148,6 +158,10 @@ struct Loop {
getLazyLoop().loop = nullptr;
}

static LoopData* data(struct us_loop_t *loop) {
return (LoopData *) us_loop_ext(loop);
}

void addPostHandler(void *key, MoveOnlyFunction<void(Loop *)> &&handler) {
LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);

Expand Down
1 change: 1 addition & 0 deletions packages/bun-uws/src/LoopData.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ struct alignas(16) LoopData {
}
delete [] corkBuffer;
}

void* getCorkedSocket() {
return this->corkedSocket;
}
Expand Down
10 changes: 2 additions & 8 deletions src/bun.js/bindings/webcore/JSTextEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,9 @@ static inline JSC::EncodedJSValue jsTextEncoderPrototypeFunction_encodeBody(JSC:
{
auto& vm = JSC::getVM(lexicalGlobalObject);
auto throwScope = DECLARE_THROW_SCOPE(vm);
UNUSED_PARAM(throwScope);
UNUSED_PARAM(callFrame);
EnsureStillAliveScope argument0 = callFrame->argument(0);
JSC::JSString* input = argument0.value().toStringOrNull(lexicalGlobalObject);
JSC::JSString* input = argument0.value().toString(lexicalGlobalObject);
RETURN_IF_EXCEPTION(throwScope, {});
JSC::EncodedJSValue res;
String str;
if (input->is8Bit()) {
Expand All @@ -397,11 +396,6 @@ static inline JSC::EncodedJSValue jsTextEncoderPrototypeFunction_encodeBody(JSC:
res = TextEncoder__encode16(lexicalGlobalObject, str.span16().data(), str.length());
}

if (UNLIKELY(JSC::JSValue::decode(res).isObject() && JSC::JSValue::decode(res).getObject()->isErrorInstance())) {
throwScope.throwException(lexicalGlobalObject, JSC::JSValue::decode(res));
return {};
}

RELEASE_AND_RETURN(throwScope, res);
}

Expand Down
29 changes: 18 additions & 11 deletions src/bun.js/event_loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,10 @@ pub const EventLoop = struct {
defer this.debug.exit();

if (count == 1) {
this.drainMicrotasksWithGlobal(this.global, this.virtual_machine.jsc);
const vm = this.virtual_machine;
const global = this.global;
const jsc = vm.jsc;
this.drainTasks(vm, global, jsc);
}

this.entered_event_loop_count -= 1;
Expand Down Expand Up @@ -1479,6 +1482,19 @@ pub const EventLoop = struct {
this.virtual_machine.gc_controller.processGCTimer();
}

pub fn drainTasks(this: *EventLoop, ctx: *JSC.VirtualMachine, global: *JSC.JSGlobalObject, js_vm: *JSC.VM) void {
while (true) {
while (this.tickWithCount(ctx) > 0) : (global.handleRejectedPromises()) {
this.tickConcurrent();
} else {
this.drainMicrotasksWithGlobal(global, js_vm);
this.tickConcurrent();
if (this.tasks.count > 0) continue;
}
break;
}
}

pub fn tick(this: *EventLoop) void {
JSC.markBinding(@src());
{
Expand All @@ -1496,16 +1512,7 @@ pub const EventLoop = struct {
const global = ctx.global;
const global_vm = ctx.jsc;

while (true) {
while (this.tickWithCount(ctx) > 0) : (this.global.handleRejectedPromises()) {
this.tickConcurrent();
} else {
this.drainMicrotasksWithGlobal(global, global_vm);
this.tickConcurrent();
if (this.tasks.count > 0) continue;
}
break;
}
this.drainTasks(ctx, global, global_vm);

while (this.tickWithCount(ctx) > 0) {
this.tickConcurrent();
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/javascript.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2562,7 +2562,7 @@ pub const VirtualMachine = struct {
pub const main_file_name: string = "bun:main";

pub fn drainMicrotasks(this: *VirtualMachine) void {
this.eventLoop().drainMicrotasks();
this.eventLoop().drainTasks(this, this.global, this.jsc);
}

pub fn processFetchLog(globalThis: *JSGlobalObject, specifier: bun.String, referrer: bun.String, log: *logger.Log, ret: *ErrorableResolvedSource, err: anyerror) void {
Expand Down
8 changes: 5 additions & 3 deletions src/bun.js/webcore/encoding.zig
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ pub const TextEncoder = struct {
const uint8array = JSC.JSValue.createUninitializedUint8Array(globalThis, result.written);
bun.assert(result.written <= buf.len);
bun.assert(result.read == slice.len);
const array_buffer = uint8array.asArrayBuffer(globalThis).?;
const array_buffer = uint8array.asArrayBuffer(globalThis) orelse return .zero;
bun.assert(result.written == array_buffer.len);
@memcpy(array_buffer.byteSlice()[0..result.written], buf[0..result.written]);
return uint8array;
} else {
const bytes = strings.allocateLatin1IntoUTF8(globalThis.bunVM().allocator, []const u8, slice) catch {
return JSC.toInvalidArguments("Out of memory", .{}, globalThis);
globalThis.throwOutOfMemory();
return .zero;
};
bun.assert(bytes.len >= slice.len);
return ArrayBuffer.fromBytes(bytes, .Uint8Array).toJSUnchecked(globalThis, null);
Expand Down Expand Up @@ -112,7 +113,8 @@ pub const TextEncoder = struct {
@TypeOf(slice),
slice,
) catch {
return JSC.toInvalidArguments("Out of memory", .{}, globalThis);
globalThis.throwOutOfMemory();
return .zero;
};
return ArrayBuffer.fromBytes(bytes, .Uint8Array).toJSUnchecked(globalThis, null);
}
Expand Down
12 changes: 12 additions & 0 deletions src/deps/libuwsockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ extern "C"
}
}

extern "C" void uws_res_clear_corked_socket(us_loop_t *loop) {
uWS::LoopData *loopData = uWS::Loop::data(loop);
void *corkedSocket = loopData->getCorkedSocket();
if (corkedSocket) {
if (loopData->isCorkedSSL()) {
((uWS::AsyncSocket<true> *) corkedSocket)->uncork();
} else {
((uWS::AsyncSocket<false> *) corkedSocket)->uncork();
}
}
}

void uws_app_delete(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data)
{
if (ssl)
Expand Down
9 changes: 9 additions & 0 deletions src/deps/uws.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2372,6 +2372,10 @@ pub const PosixLoop = extern struct {

const log = bun.Output.scoped(.Loop, false);

pub fn uncork(this: *PosixLoop) void {
uws_res_clear_corked_socket(this);
}

pub fn iterationNumber(this: *const PosixLoop) u64 {
return this.internal_loop_data.iteration_nr;
}
Expand Down Expand Up @@ -4103,6 +4107,10 @@ pub const WindowsLoop = extern struct {
pre: *uv.uv_prepare_t,
check: *uv.uv_check_t,

pub fn uncork(this: *PosixLoop) void {
uws_res_clear_corked_socket(this);
}

pub fn get() *WindowsLoop {
return uws_get_loop_with_native(bun.windows.libuv.Loop.get());
}
Expand Down Expand Up @@ -4418,3 +4426,4 @@ pub fn onThreadExit() void {
}

extern fn uws_app_clear_routes(ssl_flag: c_int, app: *uws_app_t) void;
extern fn uws_res_clear_corked_socket(loop: *Loop) void;

0 comments on commit cd05556

Please sign in to comment.