From 5e2f09f0edcf34e6fc2eb783c5e991c6b19a03db Mon Sep 17 00:00:00 2001 From: Ido Halevi <32218210+idohalevi@users.noreply.github.com> Date: Tue, 3 Sep 2019 12:40:01 +0300 Subject: [PATCH] Malformed lines bug (#47) * cleanup * fix travis jdk * make sendToLogzio readable better * adding a test * changing some param names * clean imports * fix naming --- .travis.yml | 2 +- .../io/logz/test/MockLogzioBulkListener.java | 17 +- .../java/io/logz/sender/HttpsSyncSender.java | 150 +++++++++--------- .../java/io/logz/sender/LogzioSender.java | 1 - .../java/io/logz/sender/LogzioSenderTest.java | 25 +++ 5 files changed, 119 insertions(+), 76 deletions(-) diff --git a/.travis.yml b/.travis.yml index b85e7c5..6515100 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,5 @@ language: java -jdk: oraclejdk8 +jdk: openjdk8 cache: directories: diff --git a/logzio-sender-test/src/main/java/io/logz/test/MockLogzioBulkListener.java b/logzio-sender-test/src/main/java/io/logz/test/MockLogzioBulkListener.java index 585ec29..6c2d507 100644 --- a/logzio-sender-test/src/main/java/io/logz/test/MockLogzioBulkListener.java +++ b/logzio-sender-test/src/main/java/io/logz/test/MockLogzioBulkListener.java @@ -40,6 +40,7 @@ public class MockLogzioBulkListener implements Closeable { private Queue logRequests = new ConcurrentLinkedQueue<>(); private final String host; private final int port; + private int malformedLogs = 0; private boolean isServerTimeoutMode = false; private boolean raiseExceptionOnLog = false; @@ -86,8 +87,10 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques logger.debug("got log: {} ", line); }); logger.debug("Total number of logRequests {} ({})", logRequests.size(), logRequests); - - // Tell Jetty we are ok, and it should return 200 + } catch (IllegalArgumentException e) { + malformedLogs++; + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + } finally { baseRequest.setHandled(true); } } @@ -264,6 +267,10 @@ public int getNumberOfReceivedLogs() { return logRequests.size(); } + public int getNumberOfReceivedMalformedLogs() { + return malformedLogs; + } + public MockLogzioBulkListener.LogRequest assertLogReceivedByMessage(String message) { Optional logRequest = getLogByMessageField(message); assertThat(logRequest.isPresent()).describedAs("Log with message '"+message+"' received").isTrue(); @@ -276,6 +283,12 @@ public void assertNumberOfReceivedMsgs(int count) { .isEqualTo(count); } + public void assertNumberOfReceivedMalformedMsgs(int count) { + assertThat(getNumberOfReceivedMalformedLogs()) + .describedAs("Malformed messages on mock listener: {}", malformedLogs) + .isEqualTo(count); + } + public void assertLogReceivedIs(String message, String token, String type, String loggerName, String level) { MockLogzioBulkListener.LogRequest log = assertLogReceivedByMessage(message); assertLogReceivedIs(log, token, type, loggerName, level); diff --git a/logzio-sender/src/main/java/io/logz/sender/HttpsSyncSender.java b/logzio-sender/src/main/java/io/logz/sender/HttpsSyncSender.java index 03016b6..e32bf5e 100644 --- a/logzio-sender/src/main/java/io/logz/sender/HttpsSyncSender.java +++ b/logzio-sender/src/main/java/io/logz/sender/HttpsSyncSender.java @@ -20,7 +20,7 @@ public class HttpsSyncSender { private static final int NEW_LINE_AS_UTF8_BYTE_ARRAY_SIZE = NEW_LINE_AS_UTF8_BYTE_ARRAY.length; - public HttpsSyncSender(HttpsRequestConfiguration configuration, SenderStatusReporter reporter) { + HttpsSyncSender(HttpsRequestConfiguration configuration, SenderStatusReporter reporter) { this.configuration = configuration; this.reporter = reporter; } @@ -29,18 +29,6 @@ public HttpsRequestConfiguration getConfiguration() { return configuration; } - private boolean shouldRetry(int statusCode) { - boolean shouldRetry = true; - switch (statusCode) { - case HttpURLConnection.HTTP_OK: - case HttpURLConnection.HTTP_BAD_REQUEST: - case HttpURLConnection.HTTP_UNAUTHORIZED: - shouldRetry = false; - break; - } - return shouldRetry; - } - private byte[] toNewLineSeparatedByteArray(List messages) { try (ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(sizeInBytes(messages) + NEW_LINE_AS_UTF8_BYTE_ARRAY_SIZE * messages.size()); OutputStream os = configuration.isCompressRequests() ? new GZIPOutputStream(byteOutputStream) : byteOutputStream) { @@ -63,91 +51,109 @@ private int sizeInBytes(List logMessages) { return totalSize; } - public void sendToLogzio(List messages) throws LogzioServerErrorException { + void sendToLogzio(List messages) throws LogzioServerErrorException { try { byte[] payload = toNewLineSeparatedByteArray(messages); int currentRetrySleep = configuration.getInitialWaitBeforeRetryMS(); for (int currTry = 1; currTry <= configuration.getMaxRetriesAttempts(); currTry++) { - - boolean shouldRetry = true; + boolean retry = true; int responseCode = 0; String responseMessage = ""; IOException savedException = null; try { - HttpURLConnection conn = (HttpURLConnection) configuration.getLogzioListenerUrl().openConnection(); - conn.setRequestMethod(configuration.getRequestMethod()); - conn.setRequestProperty("Content-length", String.valueOf(payload.length)); - conn.setRequestProperty("Content-Type", "text/plain"); - if (configuration.isCompressRequests()){ - conn.setRequestProperty("Content-Encoding", "gzip"); - } - conn.setReadTimeout(configuration.getSocketTimeout()); - conn.setConnectTimeout(configuration.getConnectTimeout()); - conn.setDoOutput(true); - conn.setDoInput(true); - - conn.getOutputStream().write(payload); - + HttpURLConnection conn = sendRequest(payload); responseCode = conn.getResponseCode(); responseMessage = conn.getResponseMessage(); - - if (responseCode == HttpURLConnection.HTTP_BAD_REQUEST) { - BufferedReader bufferedReader = null; - try { - StringBuilder problemDescription = new StringBuilder(); - InputStream errorStream = conn.getErrorStream(); - if (errorStream != null) { - bufferedReader = new BufferedReader(new InputStreamReader((errorStream))); - bufferedReader.lines().forEach(line -> problemDescription.append("\n").append(line)); - reporter.warning(String.format("Got 400 from logzio, here is the output: %s", problemDescription)); - } - } finally { - if (bufferedReader != null) { - try { - bufferedReader.close(); - } catch(Exception e) {} - } - } - } - if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) { - reporter.error("Logz.io: Got forbidden! Your token is not right. Unfortunately, dropping logs. Message: " + responseMessage); - } - - shouldRetry = shouldRetry(responseCode); + retry = handleResponse(payload, responseCode, responseMessage, conn); } catch (IOException e) { savedException = e; reporter.error("Got IO exception - " + e.getMessage()); } - if (!shouldRetry && responseCode == HttpURLConnection.HTTP_OK) { - reporter.info("Successfully sent bulk to logz.io, size: " + payload.length); - break; - + if (retry) { + currentRetrySleep = handleRetry(currentRetrySleep, currTry, responseCode, responseMessage, savedException); } else { + break; + } + } + } catch (InterruptedException e) { + reporter.info("Got interrupted exception"); + Thread.currentThread().interrupt(); + } + } - if (currTry == configuration.getMaxRetriesAttempts()){ + private int handleRetry(int currentRetrySleep, int currTry, int responseCode, String responseMessage, IOException savedException) throws LogzioServerErrorException, InterruptedException { + if (currTry == configuration.getMaxRetriesAttempts()) { + if (savedException != null) { + reporter.error("Got IO exception on the last bulk try to logz.io", savedException); + } + // Giving up, something is broken on Logz.io side, we will try again later + throw new LogzioServerErrorException("Got HTTP " + responseCode + " code from logz.io, with message: " + responseMessage); + } - if (savedException != null) { + reporter.warning("Could not send log to logz.io, retry (" + currTry + "/" + configuration.getMaxRetriesAttempts() + ")"); + reporter.warning("Sleeping for " + currentRetrySleep + " ms and will try again."); + Thread.sleep(currentRetrySleep); + return currentRetrySleep * 2; + } - reporter.error("Got IO exception on the last bulk try to logz.io", savedException); - } - // Giving up, something is broken on Logz.io side, we will try again later - throw new LogzioServerErrorException("Got HTTP " + responseCode + " code from logz.io, with message: " + responseMessage); - } + private boolean handleResponse(byte[] payload, int responseCode, String responseMessage, HttpURLConnection conn) { + boolean retry = false; + if (responseCode == HttpURLConnection.HTTP_BAD_REQUEST) { + String errorMessage = readErrorStream(conn); + if (errorMessage != null) { + reporter.warning(errorMessage); + } + } + else if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) { + reporter.error("Logz.io: Got forbidden! Your token is not right. Unfortunately, dropping logs. Message: " + responseMessage); + } + else if (responseCode == HttpURLConnection.HTTP_OK) { + reporter.info("Successfully sent bulk to logz.io, size: " + payload.length); + } else { + retry = true; + } + return retry; + } - reporter.warning("Could not send log to logz.io, retry (" + currTry + "/" + configuration.getMaxRetriesAttempts()+ ")"); - reporter.warning("Sleeping for " + currentRetrySleep + " ms and will try again."); - Thread.sleep(currentRetrySleep); - currentRetrySleep *= 2; + private String readErrorStream(HttpURLConnection conn) { + BufferedReader bufferedReader = null; + try { + StringBuilder problemDescription = new StringBuilder(); + InputStream errorStream = conn.getErrorStream(); + if (errorStream != null) { + bufferedReader = new BufferedReader(new InputStreamReader((errorStream))); + bufferedReader.lines().forEach(line -> problemDescription.append("\n").append(line)); + return String.format("Got 400 from logzio, here is the output: %s", problemDescription); + } + } finally { + if (bufferedReader != null) { + try { + bufferedReader.close(); + } catch (Exception ignored) { } } + } + return null; + } - } catch (InterruptedException e) { - reporter.info("Got interrupted exception"); - Thread.currentThread().interrupt(); + private HttpURLConnection sendRequest(byte[] payload) throws IOException { + HttpURLConnection conn = (HttpURLConnection) configuration.getLogzioListenerUrl().openConnection(); + conn.setRequestMethod(configuration.getRequestMethod()); + conn.setRequestProperty("Content-length", String.valueOf(payload.length)); + conn.setRequestProperty("Content-Type", "text/plain"); + if (configuration.isCompressRequests()) { + conn.setRequestProperty("Content-Encoding", "gzip"); } + conn.setReadTimeout(configuration.getSocketTimeout()); + conn.setConnectTimeout(configuration.getConnectTimeout()); + conn.setDoOutput(true); + conn.setDoInput(true); + + conn.getOutputStream().write(payload); + return conn; } } diff --git a/logzio-sender/src/main/java/io/logz/sender/LogzioSender.java b/logzio-sender/src/main/java/io/logz/sender/LogzioSender.java index da5a413..49c32b2 100644 --- a/logzio-sender/src/main/java/io/logz/sender/LogzioSender.java +++ b/logzio-sender/src/main/java/io/logz/sender/LogzioSender.java @@ -166,7 +166,6 @@ private void drainQueue() { List logsList = dequeueUpToMaxBatchSize(); try { httpsSyncSender.sendToLogzio(logsList); - } catch (LogzioServerErrorException e) { debug("Could not send log to logz.io: ", e); debug("Will retry in the next interval"); diff --git a/logzio-sender/src/test/java/io/logz/sender/LogzioSenderTest.java b/logzio-sender/src/test/java/io/logz/sender/LogzioSenderTest.java index 81935d9..3ee608e 100644 --- a/logzio-sender/src/test/java/io/logz/sender/LogzioSenderTest.java +++ b/logzio-sender/src/test/java/io/logz/sender/LogzioSenderTest.java @@ -110,6 +110,31 @@ public void simpleAppending() throws Exception { mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL); } + @Test + public void malformedBulk() throws Exception { + String token = "aBcDeFgHiJkLmNoPqRsT"; + String type = random(8); + String loggerName = "malformedBulk"; + int drainTimeout = 1; + + String message1 = "Testing.." + random(5); + String message2 = "Warning test.." + random(5); + + LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, + 10 * 1000, 10 * 1000, tasks,false); + LogzioSender testSender = createLogzioSender(testSenderBuilder); + + testSender.send(createJsonMessage(loggerName, message1)); + testSender.send(createJsonMessage(loggerName, message2)); + testSender.send("bug".getBytes(StandardCharsets.UTF_8)); + sleepSeconds(drainTimeout * 5); + + mockListener.assertNumberOfReceivedMsgs(2); + mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL); + mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL); + mockListener.assertNumberOfReceivedMalformedMsgs(1); + } + @Test public void simpleByteArrayAppending() throws Exception { String token = "aBcDeFgHiJkLmNoPqRsT";