Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-blocking way to delay onStdinReady() until data is available #76

Open
timboudreau opened this issue Aug 5, 2017 · 1 comment
Open

Comments

@timboudreau
Copy link
Contributor

timboudreau commented Aug 5, 2017

I'm testing this library for use in a workflow that involves piping multiple processes together (something I wish this library supported internally, where it could probably be done more efficiently - without having to copy ByteBuffers - though presumably this is zero-copy with direct ones - in order to avoid holding references to NuProcess's output buffers), including complex output handling, and occasionally piping one process to more than one output process, or observing the output from within the Java process.

One issue with creating workflows like this is this: Say I'm piping process A's stdout to process B's stdin.

I launch B, then A. A caches buffers into a ConcurrentLinkedDeque, which, B will read from when onStdInReady() is called.

However, onStdinReady() is called before any data is available, and even if it returns true, which should result in it's being called again, if no data is written to the passed buffer, it is not called again. The only solution I've found is to block in onStdinReady() until the first buffer is available. Which kind of defeats the purpose of using a non-blocking library for this stuff. But more seriously - and I haven't dug into the code deeply enough to determine this - if the thread pool used is the one created in the static block at the top of LinuxProcess, then there are only numProcessors threads available - once there are as many of these blocked as there are cores, there will be no threads to collect output.

See the example code below (it uses wav2png and sox to read an audio file into 16-bit integer wav format and generate a thumbnail from it). If you remove the call to waitForFirstWrite.await(), no output will ever be written to wav2png.

    public static void main(String[] args) throws FileNotFoundException, InterruptedException {
        String file = "/tmp/test-32bitFloat.wav";
        NuProcessBuilder soxBuilder = new NuProcessBuilder( "/usr/bin/sox", file, "-t", "wav", "-b", "16", "-" );
        NuProcessBuilder wav2pngBuilder = new NuProcessBuilder( "/usr/bin/wav2png", "-", "-o", "/tmp/test.png" );
        File w2err = new File( "/tmp/test-wav2png.err" );
        File soxerr = new File( "/tmp/test-sox.err" );
        FileChannel w2errCh = new FileOutputStream( w2err ).getChannel();
        FileChannel soxErrCh = new FileOutputStream( soxerr ).getChannel();

        ConcurrentLinkedDeque<ByteBuffer> writes = new ConcurrentLinkedDeque<>();
        AtomicBoolean done = new AtomicBoolean();
        CountDownLatch allDoneLatch = new CountDownLatch( 2 );

        CountDownLatch waitForFirstWrite = new CountDownLatch( 1 );

        wav2pngBuilder.setProcessListener( new NuProcessHandler() {

            @Override
            public void onPreStart(NuProcess nuProcess) {
            }

            @Override
            public void onStart(NuProcess nuProcess) {
                nuProcess.wantWrite();
            }

            @Override
            public void onExit(int exitCode) {
                try {
                    System.out.println( "wav2png exited. " + exitCode );
                } finally {
                    latch.countDown();
                }
            }

            @Override
            public void onStdout(ByteBuffer buffer, boolean closed) {

            }

            @Override
            public void onStderr(ByteBuffer buffer, boolean closed) {
                try {
                    w2errCh.write( buffer );
                    if ( closed ) {
                        w2errCh.close();
                    }
                } catch ( IOException ex ) {
                    ex.printStackTrace();
                }
            }

            @Override
            public boolean onStdinReady(ByteBuffer buffer) {
                try {
                    waitForFirstWrite.await();
                } catch ( InterruptedException ex ) {
                    ex.printStackTrace();
                }
                ByteBuffer out;
                int cap = buffer.capacity() - buffer.position();
                int bytesWritten = 0;
                while ( ( out = writes.pollFirst() ) != null ) {
                    out.flip();
                    int rem = out.remaining();
                    if ( rem <= cap ) {
                        bytesWritten += rem;
                        cap -= rem;
                        buffer.put( out );
                        if ( cap == 0 ) {
                            break;
                        }
                    } else {
                        if ( buffer.remaining() == 0 ) {
                            out.flip();
                            writes.addFirst( out );
                            break;
                        } else {
                            byte[] bytes = new byte[buffer.remaining()];
                            bytesWritten += bytes.length;
                            out.get( bytes );
                            buffer.put( bytes );
                            out.compact();
                            writes.addFirst( out );
                            break;
                        }
                    }
                }
                if ( done.get() && writes.isEmpty() ) {
                    return false;
                }
                if ( bytesWritten > 0 ) {
                    buffer.flip();
                }
                return true;
            }
        } );
        soxBuilder.setProcessListener( new NuProcessHandler() {
            private NuProcess proc;

            @Override
            public void onPreStart(NuProcess nuProcess) {
                this.proc = nuProcess;
            }

            @Override
            public void onStart(NuProcess nuProcess) {
            }

            @Override
            public void onExit(int exitCode) {
                latch.countDown();
            }

            @Override
            public void onStdout(ByteBuffer buffer, boolean closed) {
                done.set( closed );
                ByteBuffer copy = ByteBuffer.allocateDirect( buffer.remaining() );
                copy.put( buffer );
                writes.addLast( copy );
                waitForFirstWrite.countDown();
            }

            @Override
            public void onStderr(ByteBuffer buffer, boolean closed) {
                try {
                    soxErrCh.write( buffer );
                    if ( closed ) {
                        soxErrCh.close();
                    }
                } catch ( IOException ioe ) {
                    ioe.printStackTrace();
                }
            }

            @Override
            public boolean onStdinReady(ByteBuffer buffer) {
                return false;
            }
        } );
        NuProcess wav2png = wav2pngBuilder.start();
        NuProcess p = soxBuilder.start();

        allDoneLatch.await();
    }
@brettwooldridge
Copy link
Owner

Whenever process A receives data, have it call wantsWrite() on process B's NuProcess instance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants