From fca0cfeb5a2dd7790a72a5cdc5f6f7f76bce5d5d Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 8 Dec 2016 21:15:53 +1100 Subject: [PATCH] simplify BlockingProcessStreamReader; fix tests Update #1429. This commit simplifies the main run() method. Previous implementation checks whether the underlying Reader is ready to read then either reads a line or sleeps, catching InterruptedException to watch for any thread interruption. There are subtle difficulties with this approach: - Even if the underlying Reader is ready to read, it might not have enough bytes to form a line. It might still block. - It's not necessary to sleep. If the thread is interrupted while reading. It should throw InterruptedIOException. The method now reads in a loop, waiting for either exceptions or EOF. The test class implements a mock Logger that logs to a data structure. It then verifies that the data structure holds appropriate logs. As implemented, this can cause a race, as two threads, the writer and the verifier, run concurrently. This commit fixes this by waiting for the writing thread to terminate before verifying. --- .../testing/BlockingProcessStreamReader.java | 31 ++++++------------- .../BlockingProcessStreamReaderTest.java | 20 ++---------- 2 files changed, 12 insertions(+), 39 deletions(-) diff --git a/google-cloud-core/src/main/java/com/google/cloud/testing/BlockingProcessStreamReader.java b/google-cloud-core/src/main/java/com/google/cloud/testing/BlockingProcessStreamReader.java index 988b6593a1d4..0fa10dcacebb 100644 --- a/google-cloud-core/src/main/java/com/google/cloud/testing/BlockingProcessStreamReader.java +++ b/google-cloud-core/src/main/java/com/google/cloud/testing/BlockingProcessStreamReader.java @@ -35,7 +35,6 @@ */ class BlockingProcessStreamReader extends Thread { - private static final int STREAM_READER_SLEEP_INTERVAL_IN_MS = 200; private static final int LOG_LENGTH_LIMIT = 50000; private final BufferedReader errorReader; @@ -43,7 +42,6 @@ class BlockingProcessStreamReader extends Thread { private StringBuilder currentLog; private Level currentLogLevel; private boolean collectionMode; - private volatile boolean terminated; private final String emulatorTag; private final Pattern logLinePattern; @@ -64,8 +62,6 @@ private BlockingProcessStreamReader(String emulator, InputStream stream, String } void terminate() throws IOException { - terminated = true; - errorReader.close(); interrupt(); } @@ -73,25 +69,18 @@ void terminate() throws IOException { public void run() { String previousLine = ""; String nextLine = ""; - while (!terminated) { - try { - if (errorReader.ready()) { - previousLine = nextLine; - nextLine = errorReader.readLine(); - if (nextLine == null) { - terminated = true; - } else { - processLogLine(previousLine, nextLine); - } - } else { - sleep(STREAM_READER_SLEEP_INTERVAL_IN_MS); + try { + for (;;) { + previousLine = nextLine; + nextLine = errorReader.readLine(); + if (nextLine == null) { + break; } - } catch (IOException e) { + processLogLine(previousLine, nextLine); + } + } catch (IOException e) { + if (!isInterrupted()) { e.printStackTrace(System.err); - } catch (InterruptedException e) { - previousLine = nextLine; - nextLine = null; - break; } } processLogLine(previousLine, firstNonNull(nextLine, "")); diff --git a/google-cloud-core/src/test/java/com/google/cloud/testing/BlockingProcessStreamReaderTest.java b/google-cloud-core/src/test/java/com/google/cloud/testing/BlockingProcessStreamReaderTest.java index 73456f5e3da9..6dedcb55b680 100644 --- a/google-cloud-core/src/test/java/com/google/cloud/testing/BlockingProcessStreamReaderTest.java +++ b/google-cloud-core/src/test/java/com/google/cloud/testing/BlockingProcessStreamReaderTest.java @@ -87,17 +87,9 @@ public void testBlockUntil() throws IOException { public void testForwardLogEntry() throws IOException, InterruptedException { TestLogger logger = new TestLogger(); InputStream stream = new ByteArrayInputStream(OUTPUT_WITH_LOGS.getBytes(Charsets.UTF_8)); - BlockingProcessStreamReader thread = - BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger); - while (logger.getLogs().get(Level.INFO).isEmpty()) { - Thread.sleep(200); - } + BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger).join(); assertEquals("[emulator] log line 1" + System.lineSeparator() + "[emulator] log line 2", logger.getLogs().get(Level.INFO).iterator().next()); - thread.terminate(); - while (logger.getLogs().get(Level.FINE).isEmpty()) { - Thread.sleep(200); - } assertEquals("[emulator] log line 3", logger.getLogs().get(Level.FINE).iterator().next()); stream.close(); } @@ -106,17 +98,9 @@ public void testForwardLogEntry() throws IOException, InterruptedException { public void testForwardAlreadyTaggedLogs() throws IOException, InterruptedException { TestLogger logger = new TestLogger(); InputStream stream = new ByteArrayInputStream(TAGGED_OUTPUT_WITH_LOGS.getBytes(Charsets.UTF_8)); - BlockingProcessStreamReader thread = - BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger); - while (logger.getLogs().get(Level.INFO).isEmpty()) { - Thread.sleep(200); - } + BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger).join(); assertEquals("[emulator] log line 1" + System.lineSeparator() + "[emulator] log line 2", logger.getLogs().get(Level.INFO).iterator().next()); - thread.terminate(); - while (logger.getLogs().get(Level.FINE).isEmpty()) { - Thread.sleep(200); - } assertEquals("[emulator] log line 3", logger.getLogs().get(Level.FINE).iterator().next()); stream.close(); }