diff --git a/README.md b/README.md index 9dea734..b367854 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ Example logstash config: username => "password" } multiline => [{ - group_key => "%{[host][name]}:%{[kubernetes][pod_name]}", + group_key => "%{[host][name]}:%{[kubernetes][pod_name]}" match => { "[kubernetes][pod_name]" => "app1-.*" } diff --git a/VERSION b/VERSION index 50aea0e..e3a4f19 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.1.0 \ No newline at end of file +2.2.0 \ No newline at end of file diff --git a/build.gradle b/build.gradle index 700a511..1e4e9e6 100644 --- a/build.gradle +++ b/build.gradle @@ -56,7 +56,7 @@ dependencies { compile 'org.apache.logging.log4j:log4j-api:2.9.1' compile 'org.apache.logging.log4j:log4j-core:2.9.1' compile fileTree(dir: LOGSTASH_CORE_PATH, include: "**/logstash-core-?.?.?.jar") - compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.12' + compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.22' compile group: 'com.google.code.gson', name: 'gson', version: '2.7' testCompile 'junit:junit:4.12' diff --git a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/ExpansionPattern.java b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/ExpansionPattern.java index 20123c5..3519509 100644 --- a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/ExpansionPattern.java +++ b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/ExpansionPattern.java @@ -1,12 +1,12 @@ package com.onemainfinancial.logstash.plugins.fluent; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class ExpansionPattern { - private static final Map> expansions = new HashMap<>(); + private static final Map> expansions = new ConcurrentHashMap<>(); public final int startIndex; public final int endIndex; public final String field; @@ -26,7 +26,7 @@ public static String expand(String key, Map event) { int lastIndex = 0; for (ExpansionPattern p : e) { sb.append(key, lastIndex, p.startIndex); - Object s = Utils.get(event,p.field); + Object s = Utils.get(event, p.field); if (s == null) { s = ""; } diff --git a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/FluentSecureForward.java b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/FluentSecureForward.java index 109c9f8..1251f65 100644 --- a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/FluentSecureForward.java +++ b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/FluentSecureForward.java @@ -199,13 +199,15 @@ private ServerSocket getServerSocket() throws IOException { return ServerSocketFactory.getDefault().createServerSocket(port, 0, inetAddress); } - private void acceptNewConnection() throws IOException { - try { - Socket client = socket.accept(); - new FluentSession(this, client).start(); - } catch (SocketException e) { - if (!stopped) { - logger.error("Caught socket exception", e); + private void acceptConnections() throws IOException { + while (!stopped) { + try { + Socket client = socket.accept(); + new FluentSession(this, client).start(); + } catch (SocketException e) { + if (!stopped) { + logger.error("Caught socket exception", e); + } } } } @@ -226,9 +228,8 @@ public void start(Consumer> consumer) { logger.info("Starting {} input listener {}:{}", PLUGIN_NAME, host, port); socket = sslEnable ? getSSLServerSocket() : getServerSocket(); logger.debug("{} {} started on {}:{}", PLUGIN_NAME, id, host, port); - while (!stopped) { - acceptNewConnection(); - } + acceptConnections(); + } catch (Exception e) { logger.error("Could not start server ", e); } finally { diff --git a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/FluentSession.java b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/FluentSession.java index 6cefd8d..7bc0b14 100644 --- a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/FluentSession.java +++ b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/FluentSession.java @@ -95,6 +95,13 @@ private PingResult checkPing(ArrayValue value) { return new PingResult(true, sharedKeySalt); } + private void unpackBytes(byte[] bytes) throws IOException { + try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes)) { + while (unpacker.hasNext()) { + decodeEvent(unpacker.unpackValue()); + } + } + } @SuppressWarnings("unchecked") private void decodeEvent(Value value) { @@ -103,10 +110,10 @@ private void decodeEvent(Value value) { logger.trace("Checking value type {} from {}", valueType, fromAddress); switch (valueType) { case BINARY: - decodeEvent(MessagePack.newDefaultUnpacker(value.asBinaryValue().asByteArray()).unpackValue()); + unpackBytes(value.asBinaryValue().asByteArray()); break; case STRING: - decodeEvent(MessagePack.newDefaultUnpacker(value.asStringValue().asByteArray()).unpackValue()); + unpackBytes(value.asStringValue().asByteArray()); break; case ARRAY: for (Value v : value.asArrayValue()) { @@ -182,25 +189,20 @@ public void run() { } } catch (Exception e) { logger.error("Caught exception from socket {}", fromAddress, e); + } finally { + closeAll(messagePacker, messageUnpacker, session); } - cleanup(); } - private void cleanup() { - try { - messageUnpacker.close(); - } catch (IOException e) { - logger.trace("Could not close message unpacker for {}", fromAddress, e); - } - try { - messagePacker.close(); - } catch (IOException e) { - logger.trace("Could not close message packer for {}", fromAddress, e); - } - try { - session.close(); - } catch (IOException e) { - logger.trace("Could not close session {}", fromAddress, e); + private void closeAll(AutoCloseable... closeables) { + for (AutoCloseable closeable : closeables) { + if(closeable!=null) { + try { + closeable.close(); + } catch (Exception e) { + logger.trace("Could not close {}", closeable, e); + } + } } } diff --git a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/MultilineProcessor.java b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/MultilineProcessor.java index 9328052..133fb5d 100644 --- a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/MultilineProcessor.java +++ b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/MultilineProcessor.java @@ -3,7 +3,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; import static com.onemainfinancial.logstash.plugins.fluent.Utils.getLong; @@ -20,11 +24,12 @@ public class MultilineProcessor { private final boolean inverseMatch; private final long timeout; private final long maxMessages; - private final Map match = new HashMap<>(); + private final Map match = new ConcurrentHashMap<>(); private final FluentSecureForward parent; private final List multilineProcessors = new ArrayList<>(); - private final Map groups = new HashMap<>(); + private final Map groups = new ConcurrentHashMap<>(); private final Thread timeoutThread; + private final long bufferTime; public MultilineProcessor(Object object, FluentSecureForward fluentSecureForward) { try { @@ -32,13 +37,13 @@ public MultilineProcessor(Object object, FluentSecureForward fluentSecureForward this.parent = fluentSecureForward; this.sourceField = (String) config.getOrDefault("field", "message"); this.groupKey = (String) config.getOrDefault("group_key", ">>default group<<"); - this.lineSeparator = (String) config.getOrDefault(("line_separator"), "\\n"); + this.lineSeparator = (String) config.getOrDefault(("line_separator"), "\n"); this.pattern = getPattern((String) config.getOrDefault("pattern", ".*")); this.discardPattern = getPattern((String) config.get("discard_pattern")); - this.inverseMatch = (Boolean) config.getOrDefault("inverse_match", false); + this.inverseMatch = Boolean.parseBoolean(config.getOrDefault("inverse_match", "false").toString()); this.timeout = getLong(config, "timeout", (long) 5000); this.maxMessages = getLong(config, "max_messages", (long) 0); - this.shouldContinue = (Boolean) config.getOrDefault("continue", false); + this.shouldContinue = Boolean.parseBoolean(config.getOrDefault("continue", "false").toString()); Object o = config.get("multiline"); if (o != null) { if (o instanceof List) { @@ -57,28 +62,16 @@ public MultilineProcessor(Object object, FluentSecureForward fluentSecureForward } ExpansionPattern.getExpansions(this.groupKey); if (timeout > 0) { - timeoutThread = new Thread(() -> { - while (!parent.isStopped()) { - try { - Thread.sleep(100); - long time = System.currentTimeMillis(); - Iterator> it = groups.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry e = it.next(); - if ((time - e.getValue().updatedAt) > timeout) { - it.remove(); - sendToNextProcessor(e.getValue().build()); - } - } - - } catch (InterruptedException e) { - return; - } - } - }); - timeoutThread.start(); + if (timeout > 2000) { + bufferTime = 1000; + } else { + bufferTime = 100; + } + this.timeoutThread = new Thread(this::flushBuffers); + this.timeoutThread.start(); } else { - timeoutThread = null; + this.bufferTime = 0; + this.timeoutThread = null; } logger.debug("Created multiline processor " + this); @@ -99,6 +92,28 @@ private static String getStringFromEvent(String sourceField, Map } } + private void flushBuffers() { + + while (!parent.isStopped()) { + try { + Thread.sleep(bufferTime); + long time = System.currentTimeMillis(); + Iterator> it = groups.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry e = it.next(); + if ((time - e.getValue().updatedAt) > timeout) { + sendToNextProcessor(e.getValue().build()); + it.remove(); + } + } + + } catch (InterruptedException e) { + return; + } + } + + } + public void stop() { if (timeoutThread != null) { timeoutThread.interrupt(); @@ -131,14 +146,12 @@ public boolean shouldProcess(Map event) { public void accept(Map map) { String fieldValue = getStringFromEvent(sourceField, map); - boolean wasNull = false; boolean matches = false; if (fieldValue != null) { if (discardPattern != null && discardPattern.matcher(fieldValue).matches()) { return; } matches = pattern.matcher(fieldValue).matches(); - if (inverseMatch) { matches = !matches; }