Skip to content

Commit

Permalink
fix(ext/node): close upgraded socket when the underlying http connect…
Browse files Browse the repository at this point in the history
…ion is closed (#25387)

This change fixes the handling of upgraded socket from `node:http` module.

In `op_node_http_fetch_response_upgrade`, we create DuplexStream paired
with `hyper::upgrade::Upgraded`. When the connection is closed from the
server, the read result from `Upgraded` becomes 0. However because we
don't close the paired DuplexStream at that point, the Socket object in
JS side keeps alive even after the server closed. That caused the issue
#20179

This change fixes it by closing the paired DuplexStream when the
`Upgraded` stream returns 0 read result.

closes #20179
  • Loading branch information
kt3k authored Sep 5, 2024
1 parent dd208a6 commit 186f748
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
1 change: 1 addition & 0 deletions ext/node/ops/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ pub async fn op_node_http_fetch_response_upgrade(
loop {
let read = upgraded_rx.read(&mut buf).await?;
if read == 0 {
read_tx.shutdown().await?;
break;
}
read_tx.write_all(&buf[..read]).await?;
Expand Down
68 changes: 68 additions & 0 deletions tests/unit_node/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { text } from "node:stream/consumers";
import { assert, assertEquals, fail } from "@std/assert";
import { assertSpyCalls, spy } from "@std/testing/mock";
import { fromFileUrl, relative } from "@std/path";
import { retry } from "@std/async/retry";

import { gzip } from "node:zlib";
import { Buffer } from "node:buffer";
Expand Down Expand Up @@ -1604,3 +1605,70 @@ Deno.test("[node/http] In ClientRequest, option.hostname has precedence over opt

await responseReceived.promise;
});

Deno.test("[node/http] upgraded socket closes when the server closed without closing handshake", async () => {
const clientSocketClosed = Promise.withResolvers<void>();
const serverProcessClosed = Promise.withResolvers<void>();

// Uses the server in different process to shutdown it without closing handshake
const server = `
Deno.serve({ port: 1337 }, (req) => {
if (req.headers.get("upgrade") != "websocket") {
return new Response("ok");
}
console.log("upgrade on server");
const { socket, response } = Deno.upgradeWebSocket(req);
socket.addEventListener("message", (event) => {
console.log("server received", event.data);
socket.send("pong");
});
return response;
});
`;

const p = new Deno.Command("deno", { args: ["eval", server] }).spawn();

// Wait for the server to respond
await retry(async () => {
const resp = await fetch("http://localhost:1337");
const _text = await resp.text();
});

const options = {
port: 1337,
host: "127.0.0.1",
headers: {
"Connection": "Upgrade",
"Upgrade": "websocket",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
},
};

http.request(options).on("upgrade", (_res, socket) => {
socket.on("close", () => {
console.log("client socket closed");
clientSocketClosed.resolve();
});
socket.on("data", async (data) => {
// receives pong message
assertEquals(data, Buffer.from("8104706f6e67", "hex"));

p.kill();
await p.status;

console.log("process closed");
serverProcessClosed.resolve();

// sending some additional message
socket.write(Buffer.from("81847de88e01", "hex"));
socket.write(Buffer.from("0d81e066", "hex"));
});

// sending ping message
socket.write(Buffer.from("81847de88e01", "hex"));
socket.write(Buffer.from("0d81e066", "hex"));
}).end();

await clientSocketClosed.promise;
await serverProcessClosed.promise;
});

0 comments on commit 186f748

Please sign in to comment.