Skip to content

Commit

Permalink
Ensure multivalue byte arrays are fully unpacked
Browse files Browse the repository at this point in the history
  • Loading branch information
cpmoore committed Apr 16, 2021
1 parent 1b0327e commit 63050dd
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.onemainfinancial.logstash.plugins.fluent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ExpansionPattern {
private static final Map<String, List<ExpansionPattern>> expansions = new HashMap<>();
private static final Map<String, List<ExpansionPattern>> expansions = new ConcurrentHashMap<>();
public final int startIndex;
public final int endIndex;
public final String field;
Expand All @@ -26,7 +26,7 @@ public static String expand(String key, Map<String, Object> event) {
int lastIndex = 0;
for (ExpansionPattern p : e) {
sb.append(key, lastIndex, p.startIndex);
Object s = Utils.get(event,p.field);
Object s = Utils.get(event, p.field);
if (s == null) {
s = "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;

import static com.onemainfinancial.logstash.plugins.fluent.Utils.getLong;
Expand All @@ -20,20 +24,20 @@ public class MultilineProcessor {
private final boolean inverseMatch;
private final long timeout;
private final long maxMessages;
private final Map<String, Pattern> match = new HashMap<>();
private final Map<String, Pattern> match = new ConcurrentHashMap<>();
private final FluentSecureForward parent;
private final List<MultilineProcessor> multilineProcessors = new ArrayList<>();
private final Map<String, MessageGroup> groups = new HashMap<>();
private final Map<String, MessageGroup> groups = new ConcurrentHashMap<>();
private final Thread timeoutThread;

private final long bufferTime;

public MultilineProcessor(Object object, FluentSecureForward fluentSecureForward) {
try {
Map<String, Object> config = (Map<String, Object>) object;
this.parent = fluentSecureForward;
this.sourceField = (String) config.getOrDefault("field", "message");
this.groupKey = (String) config.getOrDefault("group_key", ">>default group<<");
this.lineSeparator = (String) config.getOrDefault(("line_separator"), "\\n");
this.lineSeparator = (String) config.getOrDefault(("line_separator"), "\n");
this.pattern = getPattern((String) config.getOrDefault("pattern", ".*"));
this.discardPattern = getPattern((String) config.get("discard_pattern"));
this.inverseMatch = Boolean.parseBoolean(config.getOrDefault("inverse_match", "false").toString());
Expand All @@ -58,28 +62,16 @@ public MultilineProcessor(Object object, FluentSecureForward fluentSecureForward
}
ExpansionPattern.getExpansions(this.groupKey);
if (timeout > 0) {
timeoutThread = new Thread(() -> {
while (!parent.isStopped()) {
try {
Thread.sleep(100);
long time = System.currentTimeMillis();
Iterator<Map.Entry<String, MessageGroup>> it = groups.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, MessageGroup> e = it.next();
if ((time - e.getValue().updatedAt) > timeout) {
it.remove();
sendToNextProcessor(e.getValue().build());
}
}

} catch (InterruptedException e) {
return;
}
}
});
timeoutThread.start();
if (timeout > 2000) {
bufferTime = 1000;
} else {
bufferTime = 100;
}
this.timeoutThread = new Thread(this::flushBuffers);
this.timeoutThread.start();
} else {
timeoutThread = null;
this.bufferTime = 0;
this.timeoutThread = null;
}

logger.debug("Created multiline processor " + this);
Expand All @@ -100,6 +92,28 @@ private static String getStringFromEvent(String sourceField, Map<String, Object>
}
}

private void flushBuffers() {

while (!parent.isStopped()) {
try {
Thread.sleep(bufferTime);
long time = System.currentTimeMillis();
Iterator<Map.Entry<String, MessageGroup>> it = groups.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, MessageGroup> e = it.next();
if ((time - e.getValue().updatedAt) > timeout) {
sendToNextProcessor(e.getValue().build());
it.remove();
}
}

} catch (InterruptedException e) {
return;
}
}

}

public void stop() {
if (timeoutThread != null) {
timeoutThread.interrupt();
Expand Down

0 comments on commit 63050dd

Please sign in to comment.