Skip to content

Commit

Permalink
simplify BlockingProcessStreamReader; fix tests
Browse files Browse the repository at this point in the history
Update #1429.

This commit simplifies the main run() method.
Previous implementation checks whether the underlying Reader is ready to read
then either reads a line or sleeps,
catching InterruptedException to watch for any thread interruption.

There are subtle difficulties with this approach:
- Even if the underlying Reader is ready to read, it might not have
  enough bytes to form a line. It might still block.
- It's not necessary to sleep. If the thread is interrupted while
  reading. It should throw InterruptedIOException.
The method now reads in a loop, waiting for either exceptions or EOF.

The test class implements a mock Logger that logs to a data structure.
It then verifies that the data structure holds appropriate logs.
As implemented, this can cause a race, as two threads,
the writer and the verifier, run concurrently.
This commit fixes this by waiting for the writing thread to terminate
before verifying.
  • Loading branch information
pongad committed Dec 8, 2016
1 parent 0838c41 commit fca0cfe
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@
*/
class BlockingProcessStreamReader extends Thread {

private static final int STREAM_READER_SLEEP_INTERVAL_IN_MS = 200;
private static final int LOG_LENGTH_LIMIT = 50000;

private final BufferedReader errorReader;
private final Logger logger;
private StringBuilder currentLog;
private Level currentLogLevel;
private boolean collectionMode;
private volatile boolean terminated;
private final String emulatorTag;
private final Pattern logLinePattern;

Expand All @@ -64,34 +62,25 @@ private BlockingProcessStreamReader(String emulator, InputStream stream, String
}

void terminate() throws IOException {
terminated = true;
errorReader.close();
interrupt();
}

@Override
public void run() {
String previousLine = "";
String nextLine = "";
while (!terminated) {
try {
if (errorReader.ready()) {
previousLine = nextLine;
nextLine = errorReader.readLine();
if (nextLine == null) {
terminated = true;
} else {
processLogLine(previousLine, nextLine);
}
} else {
sleep(STREAM_READER_SLEEP_INTERVAL_IN_MS);
try {
for (;;) {
previousLine = nextLine;
nextLine = errorReader.readLine();
if (nextLine == null) {
break;
}
} catch (IOException e) {
processLogLine(previousLine, nextLine);
}
} catch (IOException e) {
if (!isInterrupted()) {
e.printStackTrace(System.err);
} catch (InterruptedException e) {
previousLine = nextLine;
nextLine = null;
break;
}
}
processLogLine(previousLine, firstNonNull(nextLine, ""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,9 @@ public void testBlockUntil() throws IOException {
public void testForwardLogEntry() throws IOException, InterruptedException {
TestLogger logger = new TestLogger();
InputStream stream = new ByteArrayInputStream(OUTPUT_WITH_LOGS.getBytes(Charsets.UTF_8));
BlockingProcessStreamReader thread =
BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger);
while (logger.getLogs().get(Level.INFO).isEmpty()) {
Thread.sleep(200);
}
BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger).join();
assertEquals("[emulator] log line 1" + System.lineSeparator() + "[emulator] log line 2",
logger.getLogs().get(Level.INFO).iterator().next());
thread.terminate();
while (logger.getLogs().get(Level.FINE).isEmpty()) {
Thread.sleep(200);
}
assertEquals("[emulator] log line 3", logger.getLogs().get(Level.FINE).iterator().next());
stream.close();
}
Expand All @@ -106,17 +98,9 @@ public void testForwardLogEntry() throws IOException, InterruptedException {
public void testForwardAlreadyTaggedLogs() throws IOException, InterruptedException {
TestLogger logger = new TestLogger();
InputStream stream = new ByteArrayInputStream(TAGGED_OUTPUT_WITH_LOGS.getBytes(Charsets.UTF_8));
BlockingProcessStreamReader thread =
BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger);
while (logger.getLogs().get(Level.INFO).isEmpty()) {
Thread.sleep(200);
}
BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger).join();
assertEquals("[emulator] log line 1" + System.lineSeparator() + "[emulator] log line 2",
logger.getLogs().get(Level.INFO).iterator().next());
thread.terminate();
while (logger.getLogs().get(Level.FINE).isEmpty()) {
Thread.sleep(200);
}
assertEquals("[emulator] log line 3", logger.getLogs().get(Level.FINE).iterator().next());
stream.close();
}
Expand Down

0 comments on commit fca0cfe

Please sign in to comment.