Skip to content

Commit

Permalink
host preview1 adapter: optimize blocking write
Browse files Browse the repository at this point in the history
instead of using blocking_write_and_flush in 4k increments, perform biggest write possible in BlockingMode::Blocking's write.
  • Loading branch information
Pat Hickey committed Oct 30, 2023
1 parent 1fd1585 commit adf9592
Showing 1 changed file with 50 additions and 34 deletions.
84 changes: 50 additions & 34 deletions crates/wasi/src/preview2/preview1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,52 +83,68 @@ impl BlockingMode {
output_stream: Resource<streams::OutputStream>,
mut bytes: &[u8],
) -> StreamResult<usize> {
use streams::HostOutputStream as Streams;
use streams::HostOutputStream;

async fn nonblocking_write_blocking_flush(
host: &mut (impl streams::Host + poll::Host),
output_stream: Resource<streams::OutputStream>,
bytes: &[u8],
) -> StreamResult<usize> {
let n = match HostOutputStream::check_write(host, output_stream.borrowed()) {
Ok(n) => n,
Err(StreamError::Closed) => 0,
Err(e) => Err(e)?,
};

let len = bytes.len().min(n as usize);
if len == 0 {
return Ok(0);
}

match HostOutputStream::write(host, output_stream.borrowed(), bytes[..len].to_vec()) {
Ok(()) => {}
Err(StreamError::Closed) => return Ok(0),
Err(e) => Err(e)?,
}

match HostOutputStream::blocking_flush(host, output_stream.borrowed()).await {
Ok(()) => {}
Err(StreamError::Closed) => return Ok(0),
Err(e) => Err(e)?,
};

Ok(len)
}

match self {
BlockingMode::Blocking => {
let total = bytes.len();
let mut total = 0;
let pollable = HostOutputStream::subscribe(host, output_stream.borrowed())
.map_err(StreamError::Trap)?;
while !bytes.is_empty() {
// NOTE: blocking_write_and_flush takes at most one 4k buffer.
let len = bytes.len().min(4096);
let (chunk, rest) = bytes.split_at(len);
bytes = rest;

Streams::blocking_write_and_flush(
poll::Host::poll_one(host, pollable.borrowed())
.await
.map_err(StreamError::Trap)?;
match nonblocking_write_blocking_flush(
host,
output_stream.borrowed(),
Vec::from(chunk),
&bytes[total..],
)
.await?
{
0 => return Ok(total),
n => {
total += n;
let (_, rest) = bytes.split_at(n);
bytes = rest;
}
}
}

poll::HostPollable::drop(host, pollable).map_err(StreamError::Trap)?;
Ok(total)
}
BlockingMode::NonBlocking => {
let n = match Streams::check_write(host, output_stream.borrowed()) {
Ok(n) => n,
Err(StreamError::Closed) => 0,
Err(e) => Err(e)?,
};

let len = bytes.len().min(n as usize);
if len == 0 {
return Ok(0);
}

match Streams::write(host, output_stream.borrowed(), bytes[..len].to_vec()) {
Ok(()) => {}
Err(StreamError::Closed) => return Ok(0),
Err(e) => Err(e)?,
}

match Streams::blocking_flush(host, output_stream.borrowed()).await {
Ok(()) => {}
Err(StreamError::Closed) => return Ok(0),
Err(e) => Err(e)?,
};

Ok(len)
nonblocking_write_blocking_flush(host, output_stream, bytes).await
}
}
}
Expand Down

0 comments on commit adf9592

Please sign in to comment.