Skip to content

Commit

Permalink
Merge pull request #3 from OneMainF/fix-binary-unpacking
Browse files Browse the repository at this point in the history
Ensure all value from byte arrays are unpacked
  • Loading branch information
cpmoore authored Apr 16, 2021
2 parents 9a233c3 + 63050dd commit 0b23405
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 63 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Example logstash config:
username => "password"
}
multiline => [{
group_key => "%{[host][name]}:%{[kubernetes][pod_name]}",
group_key => "%{[host][name]}:%{[kubernetes][pod_name]}"
match => {
"[kubernetes][pod_name]" => "app1-.*"
}
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.0
2.2.0
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ dependencies {
compile 'org.apache.logging.log4j:log4j-api:2.9.1'
compile 'org.apache.logging.log4j:log4j-core:2.9.1'
compile fileTree(dir: LOGSTASH_CORE_PATH, include: "**/logstash-core-?.?.?.jar")
compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.12'
compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.22'
compile group: 'com.google.code.gson', name: 'gson', version: '2.7'

testCompile 'junit:junit:4.12'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.onemainfinancial.logstash.plugins.fluent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ExpansionPattern {
private static final Map<String, List<ExpansionPattern>> expansions = new HashMap<>();
private static final Map<String, List<ExpansionPattern>> expansions = new ConcurrentHashMap<>();
public final int startIndex;
public final int endIndex;
public final String field;
Expand All @@ -26,7 +26,7 @@ public static String expand(String key, Map<String, Object> event) {
int lastIndex = 0;
for (ExpansionPattern p : e) {
sb.append(key, lastIndex, p.startIndex);
Object s = Utils.get(event,p.field);
Object s = Utils.get(event, p.field);
if (s == null) {
s = "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,15 @@ private ServerSocket getServerSocket() throws IOException {
return ServerSocketFactory.getDefault().createServerSocket(port, 0, inetAddress);
}

private void acceptNewConnection() throws IOException {
try {
Socket client = socket.accept();
new FluentSession(this, client).start();
} catch (SocketException e) {
if (!stopped) {
logger.error("Caught socket exception", e);
private void acceptConnections() throws IOException {
while (!stopped) {
try {
Socket client = socket.accept();
new FluentSession(this, client).start();
} catch (SocketException e) {
if (!stopped) {
logger.error("Caught socket exception", e);
}
}
}
}
Expand All @@ -226,9 +228,8 @@ public void start(Consumer<Map<String, Object>> consumer) {
logger.info("Starting {} input listener {}:{}", PLUGIN_NAME, host, port);
socket = sslEnable ? getSSLServerSocket() : getServerSocket();
logger.debug("{} {} started on {}:{}", PLUGIN_NAME, id, host, port);
while (!stopped) {
acceptNewConnection();
}
acceptConnections();

} catch (Exception e) {
logger.error("Could not start server ", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ private PingResult checkPing(ArrayValue value) {
return new PingResult(true, sharedKeySalt);
}

private void unpackBytes(byte[] bytes) throws IOException {
try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes)) {
while (unpacker.hasNext()) {
decodeEvent(unpacker.unpackValue());
}
}
}

@SuppressWarnings("unchecked")
private void decodeEvent(Value value) {
Expand All @@ -103,10 +110,10 @@ private void decodeEvent(Value value) {
logger.trace("Checking value type {} from {}", valueType, fromAddress);
switch (valueType) {
case BINARY:
decodeEvent(MessagePack.newDefaultUnpacker(value.asBinaryValue().asByteArray()).unpackValue());
unpackBytes(value.asBinaryValue().asByteArray());
break;
case STRING:
decodeEvent(MessagePack.newDefaultUnpacker(value.asStringValue().asByteArray()).unpackValue());
unpackBytes(value.asStringValue().asByteArray());
break;
case ARRAY:
for (Value v : value.asArrayValue()) {
Expand Down Expand Up @@ -182,25 +189,20 @@ public void run() {
}
} catch (Exception e) {
logger.error("Caught exception from socket {}", fromAddress, e);
} finally {
closeAll(messagePacker, messageUnpacker, session);
}
cleanup();
}

private void cleanup() {
try {
messageUnpacker.close();
} catch (IOException e) {
logger.trace("Could not close message unpacker for {}", fromAddress, e);
}
try {
messagePacker.close();
} catch (IOException e) {
logger.trace("Could not close message packer for {}", fromAddress, e);
}
try {
session.close();
} catch (IOException e) {
logger.trace("Could not close session {}", fromAddress, e);
private void closeAll(AutoCloseable... closeables) {
for (AutoCloseable closeable : closeables) {
if(closeable!=null) {
try {
closeable.close();
} catch (Exception e) {
logger.trace("Could not close {}", closeable, e);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;

import static com.onemainfinancial.logstash.plugins.fluent.Utils.getLong;
Expand All @@ -20,25 +24,26 @@ public class MultilineProcessor {
private final boolean inverseMatch;
private final long timeout;
private final long maxMessages;
private final Map<String, Pattern> match = new HashMap<>();
private final Map<String, Pattern> match = new ConcurrentHashMap<>();
private final FluentSecureForward parent;
private final List<MultilineProcessor> multilineProcessors = new ArrayList<>();
private final Map<String, MessageGroup> groups = new HashMap<>();
private final Map<String, MessageGroup> groups = new ConcurrentHashMap<>();
private final Thread timeoutThread;
private final long bufferTime;

public MultilineProcessor(Object object, FluentSecureForward fluentSecureForward) {
try {
Map<String, Object> config = (Map<String, Object>) object;
this.parent = fluentSecureForward;
this.sourceField = (String) config.getOrDefault("field", "message");
this.groupKey = (String) config.getOrDefault("group_key", ">>default group<<");
this.lineSeparator = (String) config.getOrDefault(("line_separator"), "\\n");
this.lineSeparator = (String) config.getOrDefault(("line_separator"), "\n");
this.pattern = getPattern((String) config.getOrDefault("pattern", ".*"));
this.discardPattern = getPattern((String) config.get("discard_pattern"));
this.inverseMatch = (Boolean) config.getOrDefault("inverse_match", false);
this.inverseMatch = Boolean.parseBoolean(config.getOrDefault("inverse_match", "false").toString());
this.timeout = getLong(config, "timeout", (long) 5000);
this.maxMessages = getLong(config, "max_messages", (long) 0);
this.shouldContinue = (Boolean) config.getOrDefault("continue", false);
this.shouldContinue = Boolean.parseBoolean(config.getOrDefault("continue", "false").toString());
Object o = config.get("multiline");
if (o != null) {
if (o instanceof List) {
Expand All @@ -57,28 +62,16 @@ public MultilineProcessor(Object object, FluentSecureForward fluentSecureForward
}
ExpansionPattern.getExpansions(this.groupKey);
if (timeout > 0) {
timeoutThread = new Thread(() -> {
while (!parent.isStopped()) {
try {
Thread.sleep(100);
long time = System.currentTimeMillis();
Iterator<Map.Entry<String, MessageGroup>> it = groups.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, MessageGroup> e = it.next();
if ((time - e.getValue().updatedAt) > timeout) {
it.remove();
sendToNextProcessor(e.getValue().build());
}
}

} catch (InterruptedException e) {
return;
}
}
});
timeoutThread.start();
if (timeout > 2000) {
bufferTime = 1000;
} else {
bufferTime = 100;
}
this.timeoutThread = new Thread(this::flushBuffers);
this.timeoutThread.start();
} else {
timeoutThread = null;
this.bufferTime = 0;
this.timeoutThread = null;
}

logger.debug("Created multiline processor " + this);
Expand All @@ -99,6 +92,28 @@ private static String getStringFromEvent(String sourceField, Map<String, Object>
}
}

private void flushBuffers() {

while (!parent.isStopped()) {
try {
Thread.sleep(bufferTime);
long time = System.currentTimeMillis();
Iterator<Map.Entry<String, MessageGroup>> it = groups.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, MessageGroup> e = it.next();
if ((time - e.getValue().updatedAt) > timeout) {
sendToNextProcessor(e.getValue().build());
it.remove();
}
}

} catch (InterruptedException e) {
return;
}
}

}

public void stop() {
if (timeoutThread != null) {
timeoutThread.interrupt();
Expand Down Expand Up @@ -131,14 +146,12 @@ public boolean shouldProcess(Map<String, Object> event) {

public void accept(Map<String, Object> map) {
String fieldValue = getStringFromEvent(sourceField, map);
boolean wasNull = false;
boolean matches = false;
if (fieldValue != null) {
if (discardPattern != null && discardPattern.matcher(fieldValue).matches()) {
return;
}
matches = pattern.matcher(fieldValue).matches();

if (inverseMatch) {
matches = !matches;
}
Expand Down

0 comments on commit 0b23405

Please sign in to comment.