Skip to content

Commit

Permalink
Lots of improvements for jinjava and type mappings transformations an…
Browse files Browse the repository at this point in the history
…d other findings along the way

* JsonAccumulator (replayer) no longer throws when numeric values are > 32 bit ranges - now we use longs and doubles to parse the string sequences into, so some tuple transformation exceptions cease.
* regex replace filters now can be configured via the jinjava config to specify what format the replacement strings should be in.  The format is controlled by a series of regex replacements on the replacement string itself - e.g. to convert \\# to $# so that python-style regexes can be used.  Notice that this was done the way that it was so that it's a easier to override and let users tweak for their own specific needs.  Things get really sticky on malformed input and I don't have an interest in fully supporting that we behave the same on malformed escape sequences.  Letting users have the option to specify the exact escaping seems like it's a hedge.
* template files (resources) are still loaded from the jarfile, but users can add more templates and override existing ones.  New templates can be passed through the json config to the provider in a simple key->template dictionary.
* VARIANTs of top-level templates are gone.  Now there's a top-level transformByType.j2 that does some tests on the incoming document and routes switches for the replayer template or the document backfill one (implementation and tests are still pending).  That simplifies contextual awareness where this transformer no longer needs to have it.  To round that change out, replayer.j2 was renamed httpRequests.j2 since that's a more accurate name now.
* log_value and log_value_and_return are bound for jinjava templates to log through Slf4j.
* There's still more work to be done in rewriteCreateIndexRequest to work w/ various versions of ES so that we can figure out when type mappings should/shouldn't be present and do the right thing.  Now that we have source properties, that's just a matter of pulling a field and writing some more template code.
* Converted all of the test indices and types to NOT include upper-case characters to avoid confusion and exercising transforms in ways that they'll never need to be tested.

Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn committed Dec 7, 2024
1 parent 5be3ddc commit 73c8e13
Show file tree
Hide file tree
Showing 37 changed files with 519 additions and 144 deletions.
2 changes: 2 additions & 0 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ dependencies {

implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJinjavaTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down
1 change: 1 addition & 0 deletions TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJinjavaTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ public Object getNextTopLevelObject() throws IOException {
pushCompletedValue(parser.getText());
break;
case VALUE_NUMBER_INT:
pushCompletedValue(parser.getIntValue());
pushCompletedValue(parser.getLongValue());
break;
case VALUE_NUMBER_FLOAT:
pushCompletedValue(parser.getFloatValue());
pushCompletedValue(parser.getDoubleValue());
break;
case NOT_AVAILABLE:
// pipeline stall - need more data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ public Object get(Object key) {
return value;
}

public boolean missingPaylaodWasAccessed() {
public boolean missingPayloadWasAccessed() {
return payloadWasAccessed;
}

public void resetMissingPaylaodWasAccessed() {
public void resetMissingPayloadWasAccessed() {
payloadWasAccessed = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
transformedMessage = transform(transformer, httpJsonMessage);
} catch (Exception e) {
var payload = (PayloadAccessFaultingMap) httpJsonMessage.payload();
if (payload.missingPaylaodWasAccessed()) {
payload.resetMissingPaylaodWasAccessed();
if (payload.missingPayloadWasAccessed()) {
payload.resetMissingPayloadWasAccessed();
log.atDebug().setMessage("The transforms for this message require payload manipulation, "
+ "all content handlers are being loaded.").log();
// make a fresh message and its headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ public void testTransformsPropagateExceptionProperly() throws JsonProcessingExce
"[{\"TypeMappingSanitizationTransformerProvider\":\"\"}]");
var e = Assertions.assertThrows(Exception.class,
() -> transformer.transformJson(FAULTING_MAP));
Assertions.assertTrue(((PayloadAccessFaultingMap)FAULTING_MAP.payload()).missingPaylaodWasAccessed());
Assertions.assertTrue(((PayloadAccessFaultingMap)FAULTING_MAP.payload()).missingPayloadWasAccessed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.migrations.transform.TransformationLoader;
import org.opensearch.migrations.utils.TrackedFuture;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -331,8 +332,11 @@ public void testMalformedPayload_andThrowingTransformation_IsPassedThrough() thr
new TransformationLoader().getTransformerFactoryLoader(
HOST_NAME,
null,
"[{\"TypeMappingSanitizationTransformerProvider\":\"\"}]"
),
new ObjectMapper().writeValueAsString(List.of(
Map.of("JsonJinjavaTransformerProvider", Map.of(
"template", "{%- throw \"intentional exception\" -%}"
))
))),
null,
testPacketCapture,
rootContext.getTestConnectionRequestContext(0)
Expand Down Expand Up @@ -362,10 +366,7 @@ public void testMalformedPayload_andThrowingTransformation_IsPassedThrough() thr
);
var outputAndResult = finalizationFuture.get();
Assertions.assertInstanceOf(TransformationException.class,
TrackedFuture.unwindPossibleCompletionException(outputAndResult.transformationStatus.getException()),
"It's acceptable for now that the OpenSearch upgrade transformation can't handle non-json " +
"content. If that Transform wants to handle this on its own, we'll need to use another transform " +
"configuration so that it throws and we can do this test.");
TrackedFuture.unwindPossibleCompletionException(outputAndResult.transformationStatus.getException()));
var combinedOutputBuf = outputAndResult.transformedOutput.getResponseAsByteBuf();
Assertions.assertTrue(combinedOutputBuf.readableBytes() == 0);
combinedOutputBuf.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,68 @@
import org.opensearch.migrations.transform.jinjava.DynamicMacroFunction;
import org.opensearch.migrations.transform.jinjava.JavaRegexCaptureFilter;
import org.opensearch.migrations.transform.jinjava.JavaRegexReplaceFilter;
import org.opensearch.migrations.transform.jinjava.JinjavaConfig;
import org.opensearch.migrations.transform.jinjava.LogFunction;
import org.opensearch.migrations.transform.jinjava.NameMappingClasspathResourceLocator;
import org.opensearch.migrations.transform.jinjava.ThrowTag;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.hubspot.jinjava.Jinjava;
import com.hubspot.jinjava.lib.fn.ELFunctionDefinition;
import com.hubspot.jinjava.loader.ResourceLocator;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JinjavaTransformer implements IJsonTransformer {

protected static final ObjectMapper objectMapper = new ObjectMapper();
public static final String REGEX_REPLACEMENT_CONVERSION_PATTERNS = "regex_replacement_conversion_patterns";

protected final Jinjava jinjava;
protected final Function<Map<String, Object>, Map<String, Object>> createContextWithSourceFunction;
private final String templateStr;

public JinjavaTransformer(String templateString,
UnaryOperator<Map<String, Object>> contextProviderFromSource) {
this(templateString, contextProviderFromSource, new NameMappingClasspathResourceLocator());
this(templateString, contextProviderFromSource, new JinjavaConfig());
}

public JinjavaTransformer(String templateString,
UnaryOperator<Map<String, Object>> contextProviderFromSource,
@NonNull JinjavaConfig jinjavaConfig) {
this(templateString,
contextProviderFromSource,
new NameMappingClasspathResourceLocator(jinjavaConfig.getNamedScripts()),
jinjavaConfig.getRegexReplacementConversionPatterns());
}

public JinjavaTransformer(String templateString,
UnaryOperator<Map<String, Object>> createContextWithSource,
ResourceLocator resourceLocator)
ResourceLocator resourceLocator,
List<Map.Entry<String, String>> regexReplacementConversionPatterns)
{
jinjava = new Jinjava();
this.createContextWithSourceFunction = createContextWithSource;
jinjava.setResourceLocator(resourceLocator);
jinjava.getGlobalContext().registerFilter(new JavaRegexCaptureFilter());
jinjava.getGlobalContext().registerFilter(new JavaRegexReplaceFilter());

jinjava.getGlobalContext().registerFunction(new ELFunctionDefinition(
"",
"invoke_macro",
DynamicMacroFunction.class,
"invokeMacro",
String.class,
Object[].class
));
jinjava.getGlobalContext().registerFunction(
new ELFunctionDefinition("", "invoke_macro", DynamicMacroFunction.class, "invokeMacro",
String.class, Object[].class));
jinjava.getGlobalContext().registerFunction(
new ELFunctionDefinition("", "log_value_and_return", LogFunction.class, "logValueAndReturn",
String.class, Object.class, Object.class));
jinjava.getGlobalContext().registerFunction(
new ELFunctionDefinition("", "log_value", LogFunction.class, "logValue",
String.class, Object.class));

jinjava.getGlobalContext().registerTag(new ThrowTag());
jinjava.getGlobalContext().put(REGEX_REPLACEMENT_CONVERSION_PATTERNS,
Optional.ofNullable(regexReplacementConversionPatterns)
.orElse(JavaRegexReplaceFilter.DEFAULT_REGEX_REPLACE_FILTER));
this.templateStr = templateString;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,60 @@
package org.opensearch.migrations.transform.jinjava;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.opensearch.migrations.transform.JinjavaTransformer;

import com.google.common.base.Function;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.hubspot.jinjava.interpret.JinjavaInterpreter;
import com.hubspot.jinjava.lib.filter.Filter;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JavaRegexReplaceFilter implements Filter {

private static LoadingCache<String, Pattern> regexCache =
public static final List<Map.Entry<String, String>> JAVA_REGEX_REPLACE_FILTER = List.of();
public static final List<Map.Entry<String, String>> PYTHONESQUE_REGEX_REPLACE_FILTER = List.of(
Map.entry("(\\$)", "\\\\\\$"),
Map.entry("((?:\\\\\\\\)*)(\\\\)(?=\\d)", "\\$"));
public static final List<Map.Entry<String, String>> DEFAULT_REGEX_REPLACE_FILTER = PYTHONESQUE_REGEX_REPLACE_FILTER;

private static final LoadingCache<String, Pattern> regexCache =
CacheBuilder.newBuilder().build(CacheLoader.from((Function<String, Pattern>)Pattern::compile));

@SneakyThrows
private static Pattern getCompiledPattern(String pattern) {
return regexCache.get(pattern);
}

@AllArgsConstructor
@EqualsAndHashCode
private static class ReplacementAndTransform {
String replacement;
List<Map.Entry<String, String>> substitutions;
}

private static final LoadingCache<ReplacementAndTransform, String> replacementCache =
CacheBuilder.newBuilder().build(CacheLoader.from(rat -> {
var r = rat.replacement;
if (rat.substitutions != null) {
for (var kvp : rat.substitutions) {
r = r.replaceAll(kvp.getKey(), kvp.getValue());
}
}
return r;
}));


@Override
public String getName() {
return "regex_replace";
Expand All @@ -38,13 +70,21 @@ public Object filter(Object inputObject, JinjavaInterpreter interpreter, String.
String pattern = args[0];
String replacement = args[1];

String rewritten = null;
try {
Matcher matcher = getCompiledPattern(pattern).matcher(input);
var rval = matcher.replaceAll(replacement);
rewritten = replacementCache.get(
new ReplacementAndTransform(replacement,
Optional.ofNullable(interpreter)
.flatMap(ji->Optional.ofNullable(ji.getContext()))
.flatMap(c-> Optional.ofNullable((List<Map.Entry<String,String>>)
c.get(JinjavaTransformer.REGEX_REPLACEMENT_CONVERSION_PATTERNS)))
.orElse(DEFAULT_REGEX_REPLACE_FILTER)));
var rval = matcher.replaceAll(rewritten);
log.atError().setMessage("replaced value {} with {}").addArgument(input).addArgument(rval).log();
return rval;
} catch (Exception e) {
return null;
throw new RegexReplaceException(e, input, pattern, replacement, rewritten);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.opensearch.migrations.transform.jinjava;

import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class JinjavaConfig {
@JsonProperty("regexReplacementConversionPatterns")
private List<Map.Entry<String, String>> regexReplacementConversionPatterns;

@JsonProperty("regexReplacementConversionPatterns")
Map<String, String> namedScripts;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opensearch.migrations.transform.jinjava;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.event.Level;

@Slf4j
public class LogFunction {

/**
* Called from templates through the registration in the JinjavaTransformer class
*/
public static Object logValueAndReturn(String levelStr, Object valueToLog, Object valueToReturn) {
Level level;
try {
level = Level.valueOf(levelStr);
} catch (IllegalArgumentException e) {
log.atError().setMessage("Could not parse the level as it was passed in, so using ERROR. Level={}")
.addArgument(levelStr).log();
level = Level.ERROR;
}
log.atLevel(level).setMessage("{}").addArgument(valueToLog).log();
return valueToReturn;
}

public static void logValue(String level, Object valueToLog) {
logValueAndReturn(level, valueToLog, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand All @@ -20,6 +22,8 @@

@Slf4j
public class NameMappingClasspathResourceLocator extends ClasspathResourceLocator {
final Map<String, String> overrideResourceMap;

@AllArgsConstructor
@Getter
@EqualsAndHashCode
Expand All @@ -41,7 +45,11 @@ public String load(ResourceCacheKey key) throws IOException {
}
});

private String getDefaultVersion(final String fullName) throws IOException {
public NameMappingClasspathResourceLocator(Map<String, String> overrideResourceMap) {
this.overrideResourceMap = Optional.ofNullable(overrideResourceMap).orElse(Map.of());
}

private static String getDefaultVersion(final String fullName) throws IOException {
try {
var versionFile = fullName + "/defaultVersion";
var versionLines = Resources.readLines(Resources.getResource(versionFile), StandardCharsets.UTF_8).stream()
Expand All @@ -59,6 +67,10 @@ private String getDefaultVersion(final String fullName) throws IOException {

@Override
public String getString(String fullName, Charset encoding, JinjavaInterpreter interpreter) throws IOException {
var overrideResource = overrideResourceMap.get(fullName);
if (overrideResource != null) {
return overrideResource;
}
try {
return resourceCache.get(new ResourceCacheKey(fullName, encoding));
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.opensearch.migrations.transform.jinjava;

import java.util.StringJoiner;

import lombok.Getter;

@Getter
public class RegexReplaceException extends RuntimeException {
final String input;
final String pattern;
final String replacement;
final String rewrittenReplacement;

public RegexReplaceException(Throwable cause, String input, String pattern, String replacement, String rewrittenReplacement) {
super(cause);
this.input = input;
this.pattern = pattern;
this.replacement = replacement;
this.rewrittenReplacement = rewrittenReplacement;
}

@Override
public String getMessage() {
return super.getMessage() +
new StringJoiner(", ", "{", "}")
.add("input='" + input + "'")
.add("pattern='" + pattern + "'")
.add("replacement='" + replacement + "'")
.add("rewrittenReplacement='" + rewrittenReplacement + "'");
}
}
Loading

0 comments on commit 73c8e13

Please sign in to comment.