Skip to content

Commit

Permalink
Modify transformationPlugins contract to return Object to support One…
Browse files Browse the repository at this point in the history
… to Many transformations (opensearch-project#1206)

Signed-off-by: Andre Kurait <akurait@amazon.com>
  • Loading branch information
AndreKurait authored Dec 20, 2024
1 parent d1cf844 commit 06ae3ca
Show file tree
Hide file tree
Showing 34 changed files with 130 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static String convertToBulkRequestBody(Collection<BulkDocSection> bulkSec
}
}

public static BulkDocSection fromMap(Map<String, Object> map) {
public static BulkDocSection fromMap(Object map) {
BulkIndex bulkIndex = OBJECT_MAPPER.convertValue(map, BulkIndex.class);
return new BulkDocSection(bulkIndex);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.migrations.bulkload.common;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Predicate;

Expand Down Expand Up @@ -57,7 +56,7 @@ Mono<Void> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexN
BulkDocSection transformDocument(RfsLuceneDocument doc, String indexName) {
var original = new BulkDocSection(doc.id, indexName, doc.type, doc.source, doc.routing);
if (transformer != null) {
final Map<String,Object> transformedDoc = transformer.transformJson(original.toMap());
final Object transformedDoc = transformer.transformJson(original.toMap());
return BulkDocSection.fromMap(transformedDoc);
}
return BulkDocSection.fromMap(original.toMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public TransformerToIJsonTransformerAdapter(IJsonTransformer transformer) {
this(transformer, LoggerFactory.getLogger(OUTPUT_TRANSFORMATION_JSON_LOGGER));
}

private void logTransformation(Map<String, Object> before, Map<String, Object> after) {
private void logTransformation(Map<String, Object> before, Object after) {
if (transformerLogger.isInfoEnabled()) {
try {
var transformationTuple = toTransformationMap(before, after);
Expand All @@ -55,7 +55,7 @@ private void logTransformation(Map<String, Object> before, Map<String, Object> a
}
}

private Map<String, Object> toTransformationMap(Map<String, Object> before, Map<String, Object> after) {
private Map<String, Object> toTransformationMap(Map<String, Object> before, Object after) {
var transformationMap = new LinkedHashMap<String, Object>();
transformationMap.put("before", before);
transformationMap.put("after", after);
Expand All @@ -69,15 +69,15 @@ private static Map<String, Object> objectNodeToMap(Object node) {
}

@SneakyThrows
private static String printMap(Map<String, Object> map) {
private static String printMap(Object map) {
return MAPPER.writeValueAsString(map);
}

@SuppressWarnings("unchecked")
private MigrationItem transformMigrationItem(MigrationItem migrationItem) {
// Keep untouched original for logging
final Map<String, Object> originalMap = MAPPER.convertValue(migrationItem, Map.class);
var transformedMigrationItem = transformer.transformJson(MAPPER.convertValue(migrationItem, Map.class));
Object transformedMigrationItem = transformer.transformJson(MAPPER.convertValue(migrationItem, Map.class));
logTransformation(originalMap, transformedMigrationItem);
return MAPPER.convertValue(transformedMigrationItem, MigrationItem.class);
}
Expand All @@ -100,7 +100,7 @@ void updateTemplates(Collection<? extends MigrationItem> transformedItems, Objec
public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
var inputJson = objectNodeToMap(globalData.toObjectNode());
log.atInfo().setMessage("BeforeJsonGlobal: {}").addArgument(() -> printMap(inputJson)).log();
var afterJson = transformer.transformJson(inputJson);
Object afterJson = transformer.transformJson(inputJson);
log.atInfo().setMessage("AfterJsonGlobal: {}").addArgument(() -> printMap(afterJson)).log();


Expand Down Expand Up @@ -154,7 +154,7 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
final Map<String, Object> originalInput = MAPPER.convertValue(indexData, Map.class);
final Map<String, Object> inputJson = MAPPER.convertValue(indexData, Map.class);
var afterJson = transformer.transformJson(inputJson);
Object afterJson = transformer.transformJson(inputJson);
logTransformation(originalInput, afterJson);
return MAPPER.convertValue(inputJson, IndexMetadata.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ void reindex_shouldBufferByTransformedSize() throws JsonProcessingException {
// Set up the transformer that replaces the sourceDoc from the document
var repalcedSourceDoc = Map.of("simpleKey", "simpleValue");
IJsonTransformer transformer = originalJson -> {
originalJson.put("source", repalcedSourceDoc);
((Map) originalJson).put("source", repalcedSourceDoc);
return originalJson;
};
int numDocs = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void accept(SourceTargetCaptureTuple tuple, ParsedHttpMessagesAsDicts par
if (tupleLogger.isInfoEnabled()) {
try {
var originalTuple = toJSONObject(tuple, parsedMessages);
var transformedTuple = tupleTransformer.transformJson(originalTuple);
Object transformedTuple = tupleTransformer.transformJson(originalTuple);
var tupleString = PLAIN_MAPPER.writeValueAsString(transformedTuple);
tupleLogger.atInfo().setMessage("{}").addArgument(tupleString).log();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,19 @@ public static HttpJsonRequestWithFaultingPayload transform(

assert httpJsonMessage.containsKey("payload");

var returnedObject = transformer.transformJson(httpJsonMessage);
Object returnedObject = transformer.transformJson(httpJsonMessage);
if(!(returnedObject instanceof Map)) {
throw new TransformationException(
new IllegalArgumentException("Returned object from transformation not map, instead was "
+ returnedObject.getClass().getName())
);
}
@SuppressWarnings("unchecked")
var returnedObjectMap = (Map<String, ?>) returnedObject;


if (returnedObject != httpJsonMessage) {
httpJsonMessage = new HttpJsonRequestWithFaultingPayload(returnedObject);
httpJsonMessage = new HttpJsonRequestWithFaultingPayload(returnedObjectMap);
}

if (originalHttpJsonMessage != httpJsonMessage) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations.replay.datahandlers.http;

import java.util.Map;

import org.opensearch.migrations.replay.datahandlers.PayloadAccessFaultingMap;
import org.opensearch.migrations.transform.IJsonTransformer;
import org.opensearch.migrations.transform.JsonKeysForHttpMessage;
Expand Down Expand Up @@ -49,10 +51,19 @@ public static HttpJsonRequestWithFaultingPayload transform(

var protectionArtifacts = protectByteBufInHttpMessage(httpJsonMessage);

var returnedObject = transformer.transformJson(httpJsonMessage);
Object returnedObject = transformer.transformJson(httpJsonMessage);
if(!(returnedObject instanceof Map)) {
throw new TransformationException(
new IllegalArgumentException("Returned object from transformation not map, instead was "
+ returnedObject.getClass().getName())
);
}
@SuppressWarnings("unchecked")
var returnedObjectMap = (Map<String, ?>) returnedObject;


if (returnedObject != httpJsonMessage) {
httpJsonMessage = new HttpJsonRequestWithFaultingPayload(returnedObject);
httpJsonMessage = new HttpJsonRequestWithFaultingPayload(returnedObjectMap);
}

unProtectByteBufInHttpMessage(httpJsonMessage, protectionArtifacts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ public void testPartialBodyIsPassedThrough() throws Exception {
var testPacketCapture = new TestCapturePacketToHttpHandler(Duration.ofMillis(100), dummyAggregatedResponse);
var complexTransformer = new JsonCompositeTransformer(new IJsonTransformer() {
@Override
public Map<String, Object> transformJson(Map<String, Object> incomingJson) {
var payload = (Map) incomingJson.get("payload");
public Object transformJson(Object incomingJson) {
var payload = (Map) ((Map) incomingJson).get("payload");
Assertions.assertNull(payload.get(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY));
Assertions.assertNull(payload.get(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY));
((Map) incomingJson.get("headers"))
((Map) ((Map) incomingJson).get("headers"))
.put("extraKey", "extraValue");
// just walk everything - that's enough to touch the payload and throw
walkMaps(incomingJson);
Expand Down Expand Up @@ -261,11 +261,11 @@ public void testNewlineDelimitedJsonBodyIsHandled() throws Exception {
final var dummyAggregatedResponse = new AggregatedRawResponse(null, 19, Duration.ZERO, List.of(), null);
var testPacketCapture = new TestCapturePacketToHttpHandler(Duration.ofMillis(100), dummyAggregatedResponse);
var sizeCalculatingTransformer = new JsonCompositeTransformer(incomingJson -> {
var payload = (Map) incomingJson.get("payload");
var payload = (Map) ((Map) incomingJson).get("payload");
Assertions.assertNull(payload.get(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY));
Assertions.assertNull(payload.get(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY));
var list = (List) payload.get(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY);
((Map) incomingJson.get("headers"))
((Map) ((Map) incomingJson).get("headers"))
.put("listSize", ""+list.size());
return incomingJson;
});
Expand All @@ -289,13 +289,13 @@ public void testPartialNewlineDelimitedJsonBodyIsHandled() throws Exception {
final var dummyAggregatedResponse = new AggregatedRawResponse(null, 19, Duration.ZERO, List.of(), null);
var testPacketCapture = new TestCapturePacketToHttpHandler(Duration.ofMillis(100), dummyAggregatedResponse);
var sizeCalculatingTransformer = new JsonCompositeTransformer(incomingJson -> {
var payload = (Map) incomingJson.get("payload");
var payload = (Map) ((Map) incomingJson).get("payload");
Assertions.assertFalse(payload.containsKey(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY));
Assertions.assertFalse(payload.containsKey(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY));
Assertions.assertNotNull(payload.get(JsonKeysForHttpMessage.INLINED_TEXT_BODY_DOCUMENT_KEY));
var list = (List) payload.get(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY);
var leftoverString = (String) payload.get(JsonKeysForHttpMessage.INLINED_TEXT_BODY_DOCUMENT_KEY);
var headers = (Map<String,Object>) incomingJson.get("headers");
var headers = (Map<String,Object>) ((Map<String,Object>) incomingJson).get("headers");
headers.put("listSize", "" + list.size());
headers.put("leftover", "" + leftoverString.getBytes(StandardCharsets.UTF_8).length);
return incomingJson;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ public IJsonTransformer createTransformer(Object jsonConfig) {
private static class Transformer implements IJsonTransformer {
@Override
@SuppressWarnings("unchecked")
public Map<String, Object> transformJson(Map<String, Object> incomingJson) {
if (incomingJson.containsKey("index")) {
((Map<String, Object>) incomingJson.get("index")).remove("_type");
public Object transformJson(Object incomingJson) {
@SuppressWarnings("unchecked")
var incomingJsonMap = ((Map<String, Object>) incomingJson);
if (incomingJsonMap.containsKey("index")) {
((Map<String, Object>) ((Map<String, Object>) incomingJson).get("index")).remove("_type");
}
return incomingJson;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.migrations.transform;

import java.util.Map;

import io.burt.jmespath.BaseRuntime;
import io.burt.jmespath.Expression;
Expand All @@ -16,7 +15,7 @@ public JsonJMESPathPredicate(BaseRuntime<Object> runtime, String script) {
}

@Override
public boolean test(Map<String, Object> incomingJson) {
public boolean test(Object incomingJson) {
var output = expression.search(incomingJson);
log.atDebug().setMessage("output={}").addArgument(output).log();
return (Boolean) output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public JsonJMESPathTransformer(BaseRuntime<Object> runtime, String script) {
}

@Override
public Map<String, Object> transformJson(Map<String, Object> incomingJson) {
public Object transformJson(Object incomingJson) {
var output = expression.search(incomingJson);
log.info("output=" + output);
return (Map<String, Object>) output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testSimpleTransform() throws JsonProcessingException {
var documentJson = parseStringAsJson(mapper, TEST_INPUT_REQUEST);
var transformer = new JsonJMESPathTransformerProvider().createTransformer(Map.of(
"script", EXCISE_TYPE_EXPRESSION_STRING));
var transformedDocument = transformer.transformJson(documentJson);
Object transformedDocument = transformer.transformJson(documentJson);
var outputStr = emitJson(mapper, transformedDocument);

final String TEST_OUTPUT_REQUEST = "{\n"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package org.opensearch.migrations.transform;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import org.opensearch.migrations.transform.jinjava.DynamicMacroFunction;
import org.opensearch.migrations.transform.jinjava.JavaRegexCaptureFilter;
Expand All @@ -30,16 +29,16 @@ public class JinjavaTransformer implements IJsonTransformer {
public static final String REGEX_REPLACEMENT_CONVERSION_PATTERNS = "regex_replacement_conversion_patterns";

protected final Jinjava jinjava;
protected final Function<Map<String, Object>, Map<String, Object>> createContextWithSourceFunction;
protected final Function<Object, Map<String, Object>> createContextWithSourceFunction;
private final String templateStr;

public JinjavaTransformer(String templateString,
UnaryOperator<Map<String, Object>> contextProviderFromSource) {
Function<Object, Map<String, Object>> contextProviderFromSource) {
this(templateString, contextProviderFromSource, new JinjavaConfig());
}

public JinjavaTransformer(String templateString,
UnaryOperator<Map<String, Object>> contextProviderFromSource,
Function<Object, Map<String, Object>> contextProviderFromSource,
@NonNull JinjavaConfig jinjavaConfig) {
this(templateString,
contextProviderFromSource,
Expand All @@ -48,7 +47,7 @@ public JinjavaTransformer(String templateString,
}

public JinjavaTransformer(String templateString,
UnaryOperator<Map<String, Object>> createContextWithSource,
Function<Object, Map<String, Object>> createContextWithSource,
ResourceLocator resourceLocator,
List<Map.Entry<String, String>> regexReplacementConversionPatterns)
{
Expand Down Expand Up @@ -77,10 +76,22 @@ public JinjavaTransformer(String templateString,

@SneakyThrows
@Override
public Map<String, Object> transformJson(Map<String, Object> incomingJson) {
@SuppressWarnings("unchecked")
public Object transformJson(Object incomingJson) {
var resultStr = jinjava.render(templateStr, createContextWithSourceFunction.apply(incomingJson));
log.atDebug().setMessage("output from jinjava... {}").addArgument(resultStr).log();
var parsedObj = (Map<String,Object>) objectMapper.readValue(resultStr, LinkedHashMap.class);
return PreservesProcessor.doFinalSubstitutions(incomingJson, parsedObj);
Object parsedObj = objectMapper.readValue(resultStr, Object.class);
if (parsedObj instanceof Map) {
return PreservesProcessor.doFinalSubstitutions((Map<String,Object>) incomingJson, (Map<String, Object>) parsedObj);
} else if (parsedObj instanceof List) {
log.atDebug().setMessage("Received List from jinjava, processing preserves for {} maps.")
.addArgument(((List<?>) parsedObj).size()).log();
List<Map<String, Object>> listOfMaps = (List<Map<String, Object>>) parsedObj;
return listOfMaps.stream().map( json ->
PreservesProcessor.doFinalSubstitutions((Map<String,Object>) incomingJson, json)
).collect(Collectors.toList());
} else {
throw new IllegalArgumentException("Unexpected data format: " + parsedObj.getClass().getName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private static void processPreserves(Map<String, Object> source, Map<String, Obj
}

private static void copyValues(Map<String, Object> source, Map<String, Object> target,
String directiveKey, boolean forced) {
String directiveKey, boolean forced) {
Object directive = target.remove(directiveKey);
if (directive == null) {
return;
Expand Down
Loading

0 comments on commit 06ae3ca

Please sign in to comment.