Skip to content

Commit

Permalink
Merge pull request #250 from ajkannan/forward-errors
Browse files Browse the repository at this point in the history
Forward errors from gcd.sh
  • Loading branch information
aozarov committed Oct 16, 2015
2 parents b0c8874 + 291486d commit 03dbc8c
Showing 1 changed file with 129 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.gcloud.datastore.testing;

import static com.google.common.base.MoreObjects.firstNonNull;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -54,23 +55,23 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Utility to start and stop local Google Cloud Datastore process.
*/
public class LocalGcdHelper {

private static final Logger log = Logger.getLogger(LocalGcdHelper.class.getName());

private final String projectId;
private Path gcdPath;
private Process startProcess;
private ProcessStreamReader processReader;
private ProcessErrorStreamReader processErrorReader;
private final int port;

public static final String DEFAULT_PROJECT_ID = "projectid1";
Expand Down Expand Up @@ -179,49 +180,139 @@ private static Path executablePath(String cmd) {
}

private static class ProcessStreamReader extends Thread {

private final Process process;
private final BufferedReader reader;
private volatile boolean terminated;

ProcessStreamReader(Process process, String blockUntil) throws IOException {
ProcessStreamReader(InputStream inputStream) {
super("Local GCD InputStream reader");
setDaemon(true);
this.process = process;
reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
reader = new BufferedReader(new InputStreamReader(inputStream));
}

void terminate() throws IOException {
terminated = true;
reader.close();
}

@Override
public void run() {
while (!terminated) {
try {
String line = reader.readLine();
if (line == null) {
terminated = true;
}
} catch (IOException e) {
// ignore
}
}
}

public static ProcessStreamReader start(InputStream inputStream) {
ProcessStreamReader thread = new ProcessStreamReader(inputStream);
thread.start();
return thread;
}
}

private static class ProcessErrorStreamReader extends Thread {
private static final int LOG_LENGTH_LIMIT = 50000;
private static final String GCD_LOGGING_CLASS =
"com.google.apphosting.client.serviceapp.BaseApiServlet";

private final BufferedReader errorReader;
private StringBuilder currentLog;
private Level currentLogLevel;
private boolean collectionMode;
private volatile boolean terminated;

ProcessErrorStreamReader(InputStream errorStream, String blockUntil) throws IOException {
super("Local GCD ErrorStream reader");
setDaemon(true);
errorReader = new BufferedReader(new InputStreamReader(errorStream));
if (!Strings.isNullOrEmpty(blockUntil)) {
String line;
do {
line = reader.readLine();
line = errorReader.readLine();
} while (line != null && !line.contains(blockUntil));
}
}

void terminate() throws InterruptedException, IOException {
process.destroy();
process.waitFor();
reader.close();
void terminate() throws IOException {
terminated = true;
errorReader.close();
}

@Override
public void run() {
try {
while (reader.readLine() != null) {
// consume line
String previousLine = "";
String nextLine = "";
while (!terminated) {
try {
previousLine = nextLine;
nextLine = errorReader.readLine();
if (nextLine == null) {
terminated = true;
} else {
processLogLine(previousLine, nextLine);
}
} catch (IOException e) {
// ignore
}
}
processLogLine(previousLine, firstNonNull(nextLine, ""));
writeLog(currentLogLevel, currentLog);
}

private void processLogLine(String previousLine, String nextLine) {
// Each gcd log is two lines with the following format:
// [Date] [Time] [GCD_LOGGING_CLASS] [method]
// [LEVEL]: error message
// Exceptions and stack traces are included in gcd error stream, separated by a newline
Level nextLogLevel = getLevel(nextLine);
if (nextLogLevel != null) {
writeLog(currentLogLevel, currentLog);
currentLog = new StringBuilder();
currentLogLevel = nextLogLevel;
collectionMode = previousLine.contains(GCD_LOGGING_CLASS);
} else if (collectionMode) {
if (currentLog.length() > LOG_LENGTH_LIMIT) {
collectionMode = false;
} else if (currentLog.length() == 0) {
// strip level out of the line
currentLog.append("GCD");
currentLog.append(previousLine.split(":", 2)[1]);
currentLog.append(System.getProperty("line.separator"));
} else {
currentLog.append(previousLine);
currentLog.append(System.getProperty("line.separator"));
}
} catch (IOException e) {
// ignore
}
}

public static ProcessStreamReader start(Process process, String blockUntil) throws IOException {
ProcessStreamReader thread = new ProcessStreamReader(process, blockUntil);
private static void writeLog(Level level, StringBuilder msg) {
if (level != null && msg != null && msg.length() != 0) {
log.log(level, msg.toString().trim());
}
}

private static Level getLevel(String line) {
try {
return Level.parse(line.split(":")[0]);
} catch (IllegalArgumentException e) {
return null; // level wasn't supplied in this log line
}
}

public static ProcessErrorStreamReader start(InputStream errorStream, String blockUntil)
throws IOException {
ProcessErrorStreamReader thread = new ProcessErrorStreamReader(errorStream, blockUntil);
thread.start();
return thread;
}
}

private static class CommandWrapper {

private final List<String> prefix;
private List<String> command;
private String nullFilename;
Expand Down Expand Up @@ -392,13 +483,15 @@ private void startGcd(Path executablePath) throws IOException, InterruptedExcept
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "Starting datastore emulator for the project: {0}", projectId);
}
Process startProcess = CommandWrapper.create()
.command(gcdAbsolutePath.toString(), "start", "--testing", "--allow_remote_shutdown",
"--port=" + Integer.toString(port), projectId)
.directory(gcdPath)
.redirectErrorStream()
.start();
processReader = ProcessStreamReader.start(startProcess, "Dev App Server is now running");
startProcess =
CommandWrapper.create()
.command(gcdAbsolutePath.toString(), "start", "--testing", "--allow_remote_shutdown",
"--port=" + Integer.toString(port), projectId)
.directory(gcdPath)
.start();
processReader = ProcessStreamReader.start(startProcess.getInputStream());
processErrorReader = ProcessErrorStreamReader.start(
startProcess.getErrorStream(), "Dev App Server is now running");
}

private static String md5(File gcdZipFile) throws IOException {
Expand Down Expand Up @@ -454,6 +547,9 @@ public void stop() throws IOException, InterruptedException {
sendQuitRequest(port);
if (processReader != null) {
processReader.terminate();
processErrorReader.terminate();
startProcess.destroy();
startProcess.waitFor();
}
if (gcdPath != null) {
deleteRecurse(gcdPath);
Expand All @@ -465,7 +561,6 @@ private static void deleteRecurse(Path path) throws IOException {
return;
}
Files.walkFileTree(path, new SimpleFileVisitor<Path>() {

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
Expand All @@ -480,7 +575,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
});
}

public static LocalGcdHelper start(String projectId, int port)
public static LocalGcdHelper start(String projectId, int port)
throws IOException, InterruptedException {
LocalGcdHelper helper = new LocalGcdHelper(projectId, port);
helper.start();
Expand All @@ -490,15 +585,14 @@ public static LocalGcdHelper start(String projectId, int port)
public static void main(String... args) throws IOException, InterruptedException {
Map<String, String> parsedArgs = parseArgs(args);
String action = parsedArgs.get("action");
int port = (parsedArgs.get("port") == null) ? DEFAULT_PORT
: Integer.parseInt(parsedArgs.get("port"));
int port =
(parsedArgs.get("port") == null) ? DEFAULT_PORT : Integer.parseInt(parsedArgs.get("port"));
switch (action) {
case "START":
if (!isActive(DEFAULT_PROJECT_ID, port)) {
LocalGcdHelper helper = start(DEFAULT_PROJECT_ID, port);
try (FileWriter writer = new FileWriter(".local_gcd_helper")) {
writer.write(
helper.gcdPath.toAbsolutePath().toString() + System.lineSeparator());
writer.write(helper.gcdPath.toAbsolutePath().toString() + System.lineSeparator());
writer.write(Integer.toString(port));
}
}
Expand Down

0 comments on commit 03dbc8c

Please sign in to comment.