From 89508675ee906c0c73733d45b815193c1b661dac Mon Sep 17 00:00:00 2001 From: Slawomir Jaranowski Date: Sat, 6 May 2023 11:40:18 +0200 Subject: [PATCH] [MSHARED-1072] Poll data from input stream Input stream can be a System.in - all read will be blocked We need read data in no blocking mode --- .../shared/utils/cli/CommandLineUtils.java | 6 +- ...treamFeeder.java => StreamPollFeeder.java} | 60 +++++++----- .../shared/utils/cli/StreamFeederTest.java | 91 ------------------- .../utils/cli/StreamPollFeederTest.java | 76 ++++++++++++++++ 4 files changed, 116 insertions(+), 117 deletions(-) rename src/main/java/org/apache/maven/shared/utils/cli/{StreamFeeder.java => StreamPollFeeder.java} (68%) delete mode 100644 src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java create mode 100644 src/test/java/org/apache/maven/shared/utils/cli/StreamPollFeederTest.java diff --git a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java index 850d5b16..96714b4e 100644 --- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java +++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java @@ -250,13 +250,13 @@ public void run() { @Override public Integer call() throws CommandLineException { - StreamFeeder inputFeeder = null; + StreamPollFeeder inputFeeder = null; StreamPumper outputPumper = null; StreamPumper errorPumper = null; try { if (systemIn != null) { - inputFeeder = new StreamFeeder(systemIn, p.getOutputStream()); - inputFeeder.setName("StreamFeeder-systemIn"); + inputFeeder = new StreamPollFeeder(systemIn, p.getOutputStream()); + inputFeeder.setName("StreamPollFeeder-systemIn"); inputFeeder.start(); } diff --git a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java b/src/main/java/org/apache/maven/shared/utils/cli/StreamPollFeeder.java similarity index 68% rename from src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java rename to src/main/java/org/apache/maven/shared/utils/cli/StreamPollFeeder.java index f26db634..27d75bb1 100644 --- a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java +++ b/src/main/java/org/apache/maven/shared/utils/cli/StreamPollFeeder.java @@ -24,51 +24,64 @@ import java.util.Objects; /** - * Read from an InputStream and write the output to an OutputStream. + * Poll InputStream for available data and write the output to an OutputStream. * * @author Trygve Laugstøl */ -class StreamFeeder extends Thread { +class StreamPollFeeder extends Thread { - private final InputStream input; + public static final int BUF_LEN = 80; + private final InputStream input; private final OutputStream output; private Throwable exception; - private boolean done; + private boolean done; private final Object lock = new Object(); /** - * Create a new StreamFeeder + * Create a new StreamPollFeeder * * @param input Stream to read from * @param output Stream to write to */ - StreamFeeder(InputStream input, OutputStream output) { + StreamPollFeeder(InputStream input, OutputStream output) { this.input = Objects.requireNonNull(input); this.output = Objects.requireNonNull(output); this.done = false; } @Override - @SuppressWarnings("checkstyle:innerassignment") public void run() { + + byte[] buf = new byte[BUF_LEN]; + try { - for (int data; !isInterrupted() && (data = input.read()) != -1; ) { - output.write(data); + while (!done) { + if (input.available() > 0) { + int i = input.read(buf); + if (i > 0) { + output.write(buf, 0, i); + output.flush(); + } else { + done = true; + } + } else { + synchronized (lock) { + if (!done) { + lock.wait(100); + } + } + } } - output.flush(); } catch (IOException e) { exception = e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } finally { close(); } - - synchronized (lock) { - done = true; - lock.notifyAll(); - } } private void close() { @@ -89,15 +102,16 @@ public Throwable getException() { } public void waitUntilDone() { - this.interrupt(); + synchronized (lock) { - while (!done) { - try { - lock.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } + done = true; + lock.notifyAll(); + } + + try { + join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } } diff --git a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java deleted file mode 100644 index 8decce03..00000000 --- a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.maven.shared.utils.cli; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class StreamFeederTest { - static class BlockingInputStream extends ByteArrayInputStream { - public BlockingInputStream(byte[] buf) { - super(buf); - } - - @Override - public synchronized int read() { - int data = super.read(); - if (data >= 0) { - return data; - } - - // end test data ... block - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return -1; - } - } - - @Test - public void waitUntilFeederDone() throws InterruptedException { - - String TEST_DATA = "TestData"; - - BlockingInputStream inputStream = new BlockingInputStream(TEST_DATA.getBytes()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - - StreamFeeder streamFeeder = new StreamFeeder(inputStream, outputStream); - - streamFeeder.start(); - - // wait until all data from steam will be read - while (outputStream.size() < TEST_DATA.length()) { - Thread.sleep(10); - } - - streamFeeder.waitUntilDone(); // wait until process finish - - assertEquals(TEST_DATA, outputStream.toString()); - } -} diff --git a/src/test/java/org/apache/maven/shared/utils/cli/StreamPollFeederTest.java b/src/test/java/org/apache/maven/shared/utils/cli/StreamPollFeederTest.java new file mode 100644 index 00000000..7883de74 --- /dev/null +++ b/src/test/java/org/apache/maven/shared/utils/cli/StreamPollFeederTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.maven.shared.utils.cli; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class StreamPollFeederTest { + + @Test + public void waitUntilFeederDoneOnInputStream() throws Exception { + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + StreamPollFeeder streamPollFeeder = new StreamPollFeeder(System.in, outputStream); + + // start thread + streamPollFeeder.start(); + + // wait a moment + Thread.sleep(100); + + // wait until process finish + streamPollFeeder.waitUntilDone(); + assertNull(streamPollFeeder.getException()); + } + + @Test + public void dataShouldBeCopied() throws InterruptedException, IOException { + + StringBuilder TEST_DATA = new StringBuilder(); + for (int i = 0; i < 100; i++) { + TEST_DATA.append("TestData"); + } + + ByteArrayInputStream inputStream = + new ByteArrayInputStream(TEST_DATA.toString().getBytes()); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + StreamPollFeeder streamPollFeeder = new StreamPollFeeder(inputStream, outputStream); + + streamPollFeeder.start(); + + // wait until all data from steam will be read + while (outputStream.size() < TEST_DATA.length()) { + Thread.sleep(100); + } + + // wait until process finish + streamPollFeeder.waitUntilDone(); + assertNull(streamPollFeeder.getException()); + + assertEquals(TEST_DATA.toString(), outputStream.toString()); + } +}