Skip to content

Commit

Permalink
3.1.2 Set the PTY fd to be non-blocking
Browse files Browse the repository at this point in the history
We have gone back and forth a couple of times, but we _do_ need to set
this FD to be non-blocking. Otherwise node will fully consume one I/O
thread for each PTY, and node only has four of those.

This change makes the controller fd (the one the parent process reads
from / writes to) as non-blocking for node.
  • Loading branch information
lhchavez committed Jul 15, 2024
1 parent 6acb8f2 commit e82d267
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 9 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty",
"version": "3.1.1",
"version": "3.1.2",
"main": "dist/wrapper.js",
"types": "dist/wrapper.d.ts",
"author": "Szymon Kaliski <hi@szymonkaliski.com>",
Expand Down
32 changes: 30 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFun
use napi::Status::GenericFailure;
use napi::{self, Env};
use nix::errno::Errno;
use nix::fcntl::{fcntl, FcntlArg, FdFlag};
use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
use nix::poll::{poll, PollFd, PollFlags, PollTimeout};
use nix::pty::{openpty, Winsize};
use nix::sys::termios::{self, SetArg};
Expand Down Expand Up @@ -117,12 +117,14 @@ impl Pty {
}

// open pty pair, and set close-on-exec to avoid unwanted copies of the FDs from finding their
// way into subprocesses.
// way into subprocesses. Also set the nonblocking flag to avoid Node from consuming a full I/O
// thread for this.
let pty_res = openpty(&window_size, None).map_err(cast_to_napi_error)?;
let controller_fd = pty_res.master;
let user_fd = pty_res.slave;
set_close_on_exec(controller_fd.as_raw_fd(), true)?;
set_close_on_exec(user_fd.as_raw_fd(), true)?;
set_nonblocking(controller_fd.as_raw_fd())?;

// duplicate pty user_fd to be the child's stdin, stdout, and stderr
cmd.stdin(Stdio::from(user_fd.try_clone()?));
Expand Down Expand Up @@ -362,3 +364,29 @@ fn get_close_on_exec(fd: i32) -> Result<bool, napi::Error> {
)),
}
}

/// Set the file descriptor to be non-blocking.
#[allow(dead_code)]
fn set_nonblocking(fd: i32) -> Result<(), napi::Error> {
let old_flags = match fcntl(fd, FcntlArg::F_GETFL) {
Ok(flags) => OFlag::from_bits_truncate(flags),
Err(err) => {
return Err(napi::Error::new(
GenericFailure,
format!("fcntl F_GETFL: {}", err),
));
}
};

let mut new_flags = old_flags;
new_flags.set(OFlag::O_NONBLOCK, true);
if old_flags != new_flags {
if let Err(err) = fcntl(fd, FcntlArg::F_SETFL(new_flags)) {
return Err(napi::Error::new(
GenericFailure,
format!("fcntl F_SETFL: {}", err),
));
}
}
Ok(())
}
69 changes: 65 additions & 4 deletions tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Pty, getCloseOnExec, setCloseOnExec } from '../wrapper';
import { type Writable } from 'stream';
import { readdirSync, readlinkSync } from 'fs';
import { describe, test, expect } from 'vitest';

Expand Down Expand Up @@ -261,10 +262,7 @@ describe(
let buffer = Buffer.from('');
const pty = new Pty({
command: '/bin/sh',
args: [
'-c',
'sleep 0.1 ; ls /proc/$$/fd',
],
args: ['-c', 'sleep 0.1 ; ls /proc/$$/fd'],
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
Expand Down Expand Up @@ -298,6 +296,69 @@ describe(
{ repeats: 4 },
);

test.only(
'can run concurrent shells',
() =>
new Promise<void>((done) => {
const oldFds = getOpenFds();
const donePromises: Array<Promise<void>> = [];
const readyPromises: Array<Promise<void>> = [];
const writeStreams: Array<Writable> = [];

// We have local echo enabled, so we'll read the message twice.
const result = IS_DARWIN
? 'ready\r\nhello cat\r\n^D\b\bhello cat\r\n'
: 'ready\r\nhello cat\r\nhello cat\r\n';

for (let i = 0; i < 10; i++) {
donePromises.push(
new Promise<void>((accept) => {
let buffer = Buffer.from('');
const pty = new Pty({
command: '/bin/sh',
args: ['-c', 'echo ready ; exec cat'],
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
expect(buffer.toString()).toStrictEqual(result);
accept();
},
});

readyPromises.push(
new Promise<void>((ready) => {
let readyMessageReceived = false;
const readStream = pty.read;
readStream.on('data', (data) => {
buffer = Buffer.concat([buffer, data]);
if (!readyMessageReceived) {
readyMessageReceived = true;
ready();
}
});
}),
);
writeStreams.push(pty.write);
}),
);
}
Promise.allSettled(readyPromises).then(() => {
// The message should end in newline so that the EOT can signal that the input has ended and not
// just the line.
const message = 'hello cat\n';
for (const writeStream of writeStreams) {
writeStream.write(message);
writeStream.end(EOT);
}
});
Promise.allSettled(donePromises).then(() => {
expect(getOpenFds()).toStrictEqual(oldFds);
done();
});
}),
{ repeats: 4 },
);

test("doesn't break when executing non-existing binary", () =>
new Promise<void>((done) => {
const oldFds = getOpenFds();
Expand Down
6 changes: 6 additions & 0 deletions wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ export class Pty {

// strip out EIO errors
read.on('error', (err: NodeJS.ErrnoException) => {
if (err.code && err.code.indexOf('EINTR') !== -1) {
return;
}
if (err.code && err.code.indexOf('EIO') !== -1) {
eof();
return;
Expand All @@ -100,6 +103,9 @@ export class Pty {
});

write.on('error', (err: NodeJS.ErrnoException) => {
if (err.code && err.code.indexOf('EINTR') !== -1) {
return;
}
if (err.code && err.code.indexOf('EIO') !== -1) {
eof();
return;
Expand Down

0 comments on commit e82d267

Please sign in to comment.