Skip to content

Commit

Permalink
fix(net/tls) fix reusePort, allowHalfOpen, FIN before reconnect (#15452)
Browse files Browse the repository at this point in the history
Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
  • Loading branch information
cirospaciari and Jarred-Sumner authored Nov 28, 2024
1 parent 08222ed commit 471fe7b
Show file tree
Hide file tree
Showing 15 changed files with 392 additions and 51 deletions.
24 changes: 20 additions & 4 deletions packages/bun-usockets/src/bsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -623,18 +623,34 @@ inline __attribute__((always_inline)) LIBUS_SOCKET_DESCRIPTOR bsd_bind_listen_fd
setsockopt(listenFd, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (void *) &optval2, sizeof(optval2));
#endif
} else {
#if defined(SO_REUSEPORT)
int optval2 = 1;
setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, (void *) &optval2, sizeof(optval2));
#endif
#if defined(SO_REUSEPORT)
if((options & LIBUS_LISTEN_REUSE_PORT)) {
int optval2 = 1;
setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, (void *) &optval2, sizeof(optval2));
}
#endif
}

#if defined(SO_REUSEADDR)
#ifndef _WIN32

// Unlike on Unix, here we don't set SO_REUSEADDR, because it doesn't just
// allow binding to addresses that are in use by sockets in TIME_WAIT, it
// effectively allows 'stealing' a port which is in use by another application.
// See libuv issue #1360.


int optval3 = 1;
setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, (void *) &optval3, sizeof(optval3));
#endif
#endif

#ifdef IPV6_V6ONLY
// TODO: revise support to match node.js
// if (listenAddr->ai_family == AF_INET6) {
// int disabled = (options & LIBUS_SOCKET_IPV6_ONLY) != 0;
// setsockopt(listenFd, IPPROTO_IPV6, IPV6_V6ONLY, (void *) &disabled, sizeof(disabled));
// }
int disabled = 0;
setsockopt(listenFd, IPPROTO_IPV6, IPV6_V6ONLY, (void *) &disabled, sizeof(disabled));
#endif
Expand Down
50 changes: 25 additions & 25 deletions packages/bun-usockets/src/crypto/openssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -1858,16 +1858,17 @@ ssl_wrapped_context_on_close(struct us_internal_ssl_socket_t *s, int code,
(struct us_wrapped_socket_context_t *)us_internal_ssl_socket_context_ext(
context);

if (wrapped_context->events.on_close) {
wrapped_context->events.on_close((struct us_socket_t *)s, code, reason);
}

// writting here can cause the context to not be writable anymore but its the
// user responsability to check for that
if (wrapped_context->old_events.on_close) {
wrapped_context->old_events.on_close((struct us_socket_t *)s, code, reason);
}

if (wrapped_context->events.on_close) {
wrapped_context->events.on_close((struct us_socket_t *)s, code, reason);
}

us_socket_context_unref(0, wrapped_context->tcp_context);
return s;
}
Expand All @@ -1880,16 +1881,17 @@ ssl_wrapped_context_on_writable(struct us_internal_ssl_socket_t *s) {
(struct us_wrapped_socket_context_t *)us_internal_ssl_socket_context_ext(
context);

if (wrapped_context->events.on_writable) {
wrapped_context->events.on_writable((struct us_socket_t *)s);
}

// writting here can cause the context to not be writable anymore but its the
// user responsability to check for that
if (wrapped_context->old_events.on_writable) {
wrapped_context->old_events.on_writable((struct us_socket_t *)s);
}

if (wrapped_context->events.on_writable) {
wrapped_context->events.on_writable((struct us_socket_t *)s);
}

return s;
}

Expand All @@ -1916,14 +1918,14 @@ ssl_wrapped_context_on_timeout(struct us_internal_ssl_socket_t *s) {
struct us_wrapped_socket_context_t *wrapped_context =
(struct us_wrapped_socket_context_t *)us_internal_ssl_socket_context_ext(
context);
if (wrapped_context->old_events.on_timeout) {
wrapped_context->old_events.on_timeout((struct us_socket_t *)s);
}

if (wrapped_context->events.on_timeout) {
wrapped_context->events.on_timeout((struct us_socket_t *)s);
}

if (wrapped_context->old_events.on_timeout) {
wrapped_context->old_events.on_timeout((struct us_socket_t *)s);
}

return s;
}
Expand All @@ -1935,15 +1937,14 @@ ssl_wrapped_context_on_long_timeout(struct us_internal_ssl_socket_t *s) {
struct us_wrapped_socket_context_t *wrapped_context =
(struct us_wrapped_socket_context_t *)us_internal_ssl_socket_context_ext(
context);
if (wrapped_context->old_events.on_long_timeout) {
wrapped_context->old_events.on_long_timeout((struct us_socket_t *)s);
}

if (wrapped_context->events.on_long_timeout) {
wrapped_context->events.on_long_timeout((struct us_socket_t *)s);
}

if (wrapped_context->old_events.on_long_timeout) {
wrapped_context->old_events.on_long_timeout((struct us_socket_t *)s);
}

return s;
}

Expand All @@ -1954,14 +1955,13 @@ ssl_wrapped_context_on_end(struct us_internal_ssl_socket_t *s) {
struct us_wrapped_socket_context_t *wrapped_context =
(struct us_wrapped_socket_context_t *)us_internal_ssl_socket_context_ext(
context);

if (wrapped_context->events.on_end) {
wrapped_context->events.on_end((struct us_socket_t *)s);
}

if (wrapped_context->old_events.on_end) {
wrapped_context->old_events.on_end((struct us_socket_t *)s);
}
if (wrapped_context->events.on_end) {
wrapped_context->events.on_end((struct us_socket_t *)s);
}

return s;
}

Expand All @@ -1973,13 +1973,13 @@ ssl_wrapped_on_connect_error(struct us_internal_ssl_socket_t *s, int code) {
(struct us_wrapped_socket_context_t *)us_internal_ssl_socket_context_ext(
context);

if (wrapped_context->old_events.on_connect_error) {
wrapped_context->old_events.on_connect_error((struct us_connecting_socket_t *)s, code);
}
if (wrapped_context->events.on_connect_error) {
wrapped_context->events.on_connect_error((struct us_connecting_socket_t *)s, code);
}

if (wrapped_context->old_events.on_connect_error) {
wrapped_context->old_events.on_connect_error((struct us_connecting_socket_t *)s, code);
}
return s;
}

Expand All @@ -1990,14 +1990,14 @@ ssl_wrapped_on_socket_connect_error(struct us_internal_ssl_socket_t *s, int code
struct us_wrapped_socket_context_t *wrapped_context =
(struct us_wrapped_socket_context_t *)us_internal_ssl_socket_context_ext(
context);

if (wrapped_context->old_events.on_connecting_socket_error) {
wrapped_context->old_events.on_connecting_socket_error((struct us_socket_t *)s, code);
}
if (wrapped_context->events.on_connecting_socket_error) {
wrapped_context->events.on_connecting_socket_error((struct us_socket_t *)s, code);
}

if (wrapped_context->old_events.on_connecting_socket_error) {
wrapped_context->old_events.on_connecting_socket_error((struct us_socket_t *)s, code);
}

return s;
}

Expand Down
4 changes: 4 additions & 0 deletions packages/bun-usockets/src/libusockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ enum {
LIBUS_LISTEN_EXCLUSIVE_PORT = 1,
/* Allow socket to keep writing after readable side closes */
LIBUS_SOCKET_ALLOW_HALF_OPEN = 2,
/* Setting reusePort allows multiple sockets on the same host to bind to the same port. Incoming connections are distributed by the operating system to listening sockets. This option is available only on some platforms, such as Linux 3.9+, DragonFlyBSD 3.6+, FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+*/
LIBUS_LISTEN_REUSE_PORT = 4,
/* etting ipv6Only will disable dual-stack support, i.e., binding to host :: won't make 0.0.0.0 be bound.*/
LIBUS_SOCKET_IPV6_ONLY = 8,
};

/* Library types publicly available */
Expand Down
1 change: 1 addition & 0 deletions packages/bun-usockets/src/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <sys/ioctl.h>
#endif


/* The loop has 2 fallthrough polls */
void us_internal_loop_data_init(struct us_loop_t *loop, void (*wakeup_cb)(struct us_loop_t *loop),
void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop)) {
Expand Down
40 changes: 31 additions & 9 deletions src/bun.js/api/bun/socket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,17 @@ pub const SocketConfig = struct {
default_data: JSC.JSValue = .zero,
exclusive: bool = false,
allowHalfOpen: bool = false,
reusePort: bool = false,
ipv6Only: bool = false,

pub fn fromJS(vm: *JSC.VirtualMachine, opts: JSC.JSValue, globalObject: *JSC.JSGlobalObject) bun.JSError!SocketConfig {
var hostname_or_unix: JSC.ZigString.Slice = JSC.ZigString.Slice.empty;
errdefer hostname_or_unix.deinit();
var port: ?u16 = null;
var exclusive = false;
var allowHalfOpen = false;
var reusePort = false;
var ipv6Only = false;

var ssl: ?JSC.API.ServerConfig.SSLConfig = null;
var default_data = JSValue.zero;
Expand Down Expand Up @@ -365,11 +369,19 @@ pub const SocketConfig = struct {
}
}

if (try opts.getTruthy(globalObject, "exclusive")) |_| {
exclusive = true;
if (try opts.getBooleanLoose(globalObject, "exclusive")) |exclusive_| {
exclusive = exclusive_;
}
if (try opts.getTruthy(globalObject, "allowHalfOpen")) |_| {
allowHalfOpen = true;
if (try opts.getBooleanLoose(globalObject, "allowHalfOpen")) |allow_half_open| {
allowHalfOpen = allow_half_open;
}

if (try opts.getBooleanLoose(globalObject, "reusePort")) |reuse_port| {
reusePort = reuse_port;
}

if (try opts.getBooleanLoose(globalObject, "ipv6Only")) |ipv6_only| {
ipv6Only = ipv6_only;
}

if (try opts.getStringish(globalObject, "hostname") orelse try opts.getStringish(globalObject, "host")) |hostname| {
Expand Down Expand Up @@ -437,6 +449,8 @@ pub const SocketConfig = struct {
.default_data = default_data,
.exclusive = exclusive,
.allowHalfOpen = allowHalfOpen,
.reusePort = reusePort,
.ipv6Only = ipv6Only,
};
}
};
Expand Down Expand Up @@ -603,10 +617,13 @@ pub const Listener = struct {

const ssl_enabled = ssl != null;

var socket_flags: i32 = if (exclusive) uws.LIBUS_LISTEN_EXCLUSIVE_PORT else uws.LIBUS_LISTEN_DEFAULT;
var socket_flags: i32 = if (exclusive) uws.LIBUS_LISTEN_EXCLUSIVE_PORT else (if (socket_config.reusePort) uws.LIBUS_SOCKET_REUSE_PORT else uws.LIBUS_LISTEN_DEFAULT);
if (socket_config.allowHalfOpen) {
socket_flags |= uws.LIBUS_SOCKET_ALLOW_HALF_OPEN;
}
if (socket_config.ipv6Only) {
socket_flags |= uws.LIBUS_SOCKET_IPV6_ONLY;
}
defer if (ssl != null) ssl.?.deinit();

if (Environment.isWindows) {
Expand Down Expand Up @@ -2174,6 +2191,8 @@ fn NewSocket(comptime ssl: bool) type {
}

const args = callframe.argumentsUndef(2);
this.ref();
defer this.deref();

return switch (this.writeOrEndBuffered(globalObject, args.ptr[0], args.ptr[1], false)) {
.fail => .zero,
Expand Down Expand Up @@ -2469,7 +2488,6 @@ fn NewSocket(comptime ssl: bool) type {
) bun.JSError!JSValue {
JSC.markBinding(@src());
const args = callframe.arguments_old(1);
this.buffered_data_for_node_net.deinitWithAllocator(bun.default_allocator);
if (args.len > 0 and args.ptr[0].toBoolean()) {
this.socket.shutdownRead();
} else {
Expand All @@ -2494,6 +2512,9 @@ fn NewSocket(comptime ssl: bool) type {
return JSValue.jsNumber(@as(i32, -1));
}

this.ref();
defer this.deref();

return switch (this.writeOrEnd(globalObject, args.mut(), false, true)) {
.fail => .zero,
.success => |result| brk: {
Expand Down Expand Up @@ -3488,7 +3509,8 @@ fn NewSocket(comptime ssl: bool) type {
TLSSocket.dataSetCached(tls_js_value, globalObject, default_data);

tls.socket = new_socket;
tls.socket_context = new_socket.context(); // owns the new tls context that have a ref from the old one
const new_context = new_socket.context().?;
tls.socket_context = new_context; // owns the new tls context that have a ref from the old one
tls.ref();
const vm = handlers.vm;

Expand Down Expand Up @@ -3518,7 +3540,7 @@ fn NewSocket(comptime ssl: bool) type {
.connection = if (this.connection) |c| c.clone() else null,
.wrapped = .tcp,
.protos = null,
.socket_context = null, // raw socket will dont own the context
.socket_context = new_context.ref(true),
});
raw.ref();

Expand Down Expand Up @@ -3637,7 +3659,7 @@ pub fn NewWrappedHandler(comptime tls: bool) type {
if (comptime tls) {
TLSSocket.onData(this.tls, socket, data);
} else {
// tedius use this
// tedius use this (tedius is a pure-javascript implementation of TDS protocol used to interact with instances of Microsoft's SQL Server)
TLSSocket.onData(this.tcp, socket, data);
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7227,7 +7227,8 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
app.listenWithConfig(*ThisServer, this, onListen, .{
.port = tcp.port,
.host = host,
.options = if (this.config.reuse_port) 0 else 1,
// IPV6_ONLY is the default for bun, different from node it also set exclusive port in case reuse port is not set
.options = (if (this.config.reuse_port) uws.LIBUS_SOCKET_REUSE_PORT else uws.LIBUS_LISTEN_EXCLUSIVE_PORT) | uws.LIBUS_SOCKET_IPV6_ONLY,
});
},

Expand All @@ -7237,7 +7238,8 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
this,
onListen,
unix,
if (this.config.reuse_port) 0 else 1,
// IPV6_ONLY is the default for bun, different from node it also set exclusive port in case reuse port is not set
(if (this.config.reuse_port) uws.LIBUS_SOCKET_REUSE_PORT else uws.LIBUS_LISTEN_EXCLUSIVE_PORT) | uws.LIBUS_SOCKET_IPV6_ONLY,
);
},
}
Expand Down
8 changes: 8 additions & 0 deletions src/deps/uws.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ pub const u_int64_t = c_ulonglong;
pub const LIBUS_LISTEN_DEFAULT: i32 = 0;
pub const LIBUS_LISTEN_EXCLUSIVE_PORT: i32 = 1;
pub const LIBUS_SOCKET_ALLOW_HALF_OPEN: i32 = 2;
pub const LIBUS_SOCKET_REUSE_PORT: i32 = 4;
pub const LIBUS_SOCKET_IPV6_ONLY: i32 = 8;

pub const Socket = opaque {
pub fn write2(this: *Socket, first: []const u8, second: []const u8) i32 {
const rc = us_socket_write2(0, this, first.ptr, first.len, second.ptr, second.len);
Expand Down Expand Up @@ -2356,6 +2359,11 @@ pub const SocketContext = opaque {
us_socket_context_free(@as(i32, 0), this);
}

pub fn ref(this: *SocketContext, comptime ssl: bool) *SocketContext {
us_socket_context_ref(@intFromBool(ssl), this);
return this;
}

pub fn cleanCallbacks(ctx: *SocketContext, is_ssl: bool) void {
const ssl_int: i32 = @intFromBool(is_ssl);
// replace callbacks with dummy ones
Expand Down
Loading

0 comments on commit 471fe7b

Please sign in to comment.