From 63050dd3deb63f642246fe0b1473d530d3b06913 Mon Sep 17 00:00:00 2001 From: Cody P Moore Date: Fri, 16 Apr 2021 11:06:37 -0500 Subject: [PATCH] Ensure multivalue byte arrays are fully unpacked --- .../plugins/fluent/ExpansionPattern.java | 6 +- .../plugins/fluent/MultilineProcessor.java | 66 +++++++++++-------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/ExpansionPattern.java b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/ExpansionPattern.java index 20123c5..3519509 100644 --- a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/ExpansionPattern.java +++ b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/ExpansionPattern.java @@ -1,12 +1,12 @@ package com.onemainfinancial.logstash.plugins.fluent; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class ExpansionPattern { - private static final Map> expansions = new HashMap<>(); + private static final Map> expansions = new ConcurrentHashMap<>(); public final int startIndex; public final int endIndex; public final String field; @@ -26,7 +26,7 @@ public static String expand(String key, Map event) { int lastIndex = 0; for (ExpansionPattern p : e) { sb.append(key, lastIndex, p.startIndex); - Object s = Utils.get(event,p.field); + Object s = Utils.get(event, p.field); if (s == null) { s = ""; } diff --git a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/MultilineProcessor.java b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/MultilineProcessor.java index 6d1f591..133fb5d 100644 --- a/src/main/java/com/onemainfinancial/logstash/plugins/fluent/MultilineProcessor.java +++ b/src/main/java/com/onemainfinancial/logstash/plugins/fluent/MultilineProcessor.java @@ -3,7 +3,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; import static com.onemainfinancial.logstash.plugins.fluent.Utils.getLong; @@ -20,12 +24,12 @@ public class MultilineProcessor { private final boolean inverseMatch; private final long timeout; private final long maxMessages; - private final Map match = new HashMap<>(); + private final Map match = new ConcurrentHashMap<>(); private final FluentSecureForward parent; private final List multilineProcessors = new ArrayList<>(); - private final Map groups = new HashMap<>(); + private final Map groups = new ConcurrentHashMap<>(); private final Thread timeoutThread; - + private final long bufferTime; public MultilineProcessor(Object object, FluentSecureForward fluentSecureForward) { try { @@ -33,7 +37,7 @@ public MultilineProcessor(Object object, FluentSecureForward fluentSecureForward this.parent = fluentSecureForward; this.sourceField = (String) config.getOrDefault("field", "message"); this.groupKey = (String) config.getOrDefault("group_key", ">>default group<<"); - this.lineSeparator = (String) config.getOrDefault(("line_separator"), "\\n"); + this.lineSeparator = (String) config.getOrDefault(("line_separator"), "\n"); this.pattern = getPattern((String) config.getOrDefault("pattern", ".*")); this.discardPattern = getPattern((String) config.get("discard_pattern")); this.inverseMatch = Boolean.parseBoolean(config.getOrDefault("inverse_match", "false").toString()); @@ -58,28 +62,16 @@ public MultilineProcessor(Object object, FluentSecureForward fluentSecureForward } ExpansionPattern.getExpansions(this.groupKey); if (timeout > 0) { - timeoutThread = new Thread(() -> { - while (!parent.isStopped()) { - try { - Thread.sleep(100); - long time = System.currentTimeMillis(); - Iterator> it = groups.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry e = it.next(); - if ((time - e.getValue().updatedAt) > timeout) { - it.remove(); - sendToNextProcessor(e.getValue().build()); - } - } - - } catch (InterruptedException e) { - return; - } - } - }); - timeoutThread.start(); + if (timeout > 2000) { + bufferTime = 1000; + } else { + bufferTime = 100; + } + this.timeoutThread = new Thread(this::flushBuffers); + this.timeoutThread.start(); } else { - timeoutThread = null; + this.bufferTime = 0; + this.timeoutThread = null; } logger.debug("Created multiline processor " + this); @@ -100,6 +92,28 @@ private static String getStringFromEvent(String sourceField, Map } } + private void flushBuffers() { + + while (!parent.isStopped()) { + try { + Thread.sleep(bufferTime); + long time = System.currentTimeMillis(); + Iterator> it = groups.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry e = it.next(); + if ((time - e.getValue().updatedAt) > timeout) { + sendToNextProcessor(e.getValue().build()); + it.remove(); + } + } + + } catch (InterruptedException e) { + return; + } + } + + } + public void stop() { if (timeoutThread != null) { timeoutThread.interrupt();