Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ext/node): native vectored write for server streams #19752

Merged
merged 5 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions ext/http/http_next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
Expand Down Expand Up @@ -1034,6 +1035,34 @@ impl UpgradeStream {
.try_or_cancel(cancel_handle)
.await
}

async fn write_vectored(
self: Rc<Self>,
buf1: &[u8],
littledivy marked this conversation as resolved.
Show resolved Hide resolved
buf2: &[u8],
) -> Result<usize, AnyError> {
let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await;

let total = buf1.len() + buf2.len();
let mut bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)];
let mut nwritten = wr.write_vectored(&bufs).await?;
if nwritten == total {
return Ok(nwritten);
}

// Slightly more optimized than (unstable) write_all_vectored for 2 iovecs.
while nwritten <= buf1.len() {
bufs[0] = std::io::IoSlice::new(&buf1[nwritten..]);
nwritten += wr.write_vectored(&bufs).await?;
}

// First buffer out of the way.
if nwritten < total && nwritten > buf1.len() {
wr.write_all(&buf2[nwritten - buf1.len()..]).await?;
}

Ok(total)
}
}

impl Resource for UpgradeStream {
Expand All @@ -1048,3 +1077,16 @@ impl Resource for UpgradeStream {
self.cancel_handle.cancel();
}
}

#[op]
pub async fn op_raw_write_vectored(
littledivy marked this conversation as resolved.
Show resolved Hide resolved
state: Rc<RefCell<OpState>>,
rid: ResourceId,
buf1: JsBuffer,
buf2: JsBuffer,
) -> Result<usize, AnyError> {
let resource: Rc<UpgradeStream> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not an UpgradeStream, it should probably fall back to the standard write methods. I don't think we should assume that any two-chunk write is a websocket write.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In JS, All TCPSERVERWRAP provider type resources are an UpgradeStream so this does not need a fallback.

state.borrow().resource_table.get::<UpgradeStream>(rid)?;
let nwritten = resource.write_vectored(&buf1, &buf2).await?;
Ok(nwritten)
}
1 change: 1 addition & 0 deletions ext/http/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ deno_core::extension!(
http_next::op_http_track,
http_next::op_http_upgrade_websocket_next,
http_next::op_http_upgrade_raw,
http_next::op_raw_write_vectored,
http_next::op_http_try_wait,
http_next::op_http_wait,
],
Expand Down
4 changes: 3 additions & 1 deletion ext/node/polyfills/internal/stream_base_commons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ export function onStreamRead(
}
} else {
const offset = streamBaseState[kArrayBufferOffset];
const buf = Buffer.from(arrayBuffer, offset, nread);
// Performance note: Pass ArrayBuffer to Buffer#from to avoid
// copy.
const buf = Buffer.from(arrayBuffer.buffer, offset, nread);
result = stream.push(buf);
}

Expand Down
30 changes: 29 additions & 1 deletion ext/node/polyfills/internal_binding/stream_wrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ import {
} from "ext:deno_node/internal_binding/async_wrap.ts";
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";

const core = globalThis.Deno.core;
const { ops } = core;

interface Reader {
read(p: Uint8Array): Promise<number | null>;
}
Expand All @@ -54,7 +57,7 @@ export interface Closer {

type Ref = { ref(): void; unref(): void };

enum StreamBaseStateFields {
const enum StreamBaseStateFields {
kReadBytesOrError,
kArrayBufferOffset,
kBytesWritten,
Expand Down Expand Up @@ -195,6 +198,31 @@ export class LibuvStreamWrap extends HandleWrap {
chunks: Buffer[] | (string | Buffer)[],
allBuffers: boolean,
): number {
const supportsWritev = this.provider === providerType.TCPSERVERWRAP;
// Fast case optimization: two chunks, and all buffers.
if (chunks.length === 2 && allBuffers && supportsWritev) {
// String chunks.
if (typeof chunks[0] === "string") chunks[0] = Buffer.from(chunks[0]);
if (typeof chunks[1] === "string") chunks[1] = Buffer.from(chunks[1]);

ops.op_raw_write_vectored(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this path called with chunks.length == 1 anywhere? There may be an opportunity to optimize for N <= M instead of N == 2.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunks.length == 1 should be using write instead of writev

this[kStreamBaseField]!.rid,
chunks[0],
chunks[1],
).then((nwritten) => {
try {
req.oncomplete(0);
} catch {
// swallow callback errors.
}

streamBaseState[kBytesWritten] = nwritten;
this.bytesWritten += nwritten;
});

return 0;
}

const count = allBuffers ? chunks.length : chunks.length >> 1;
const buffers: Buffer[] = new Array(count);

Expand Down