Skip to content

Commit

Permalink
Merge pull request opensearch-project#1154 from gregschohn/TypeMappin…
Browse files Browse the repository at this point in the history
…gsTransformation
  • Loading branch information
gregschohn authored Dec 10, 2024
2 parents 99bd50b + 00968cd commit 6d3b225
Show file tree
Hide file tree
Showing 85 changed files with 2,949 additions and 340 deletions.
2 changes: 1 addition & 1 deletion DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies {
implementation project(":RFS")
implementation project(":transformation")
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.apache.logging.log4j', name: 'log4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core'
Expand Down
4 changes: 3 additions & 1 deletion RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ dependencies {

implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJinjavaTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down Expand Up @@ -62,7 +64,7 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'

testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
testRuntimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
testRuntimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317" #--transformer-config-base64 W3sgIkpzb25Kb2x0VHJhbnNmb3JtZXJQcm92aWRlciI6ClsKICB7CiAgICAic2NyaXB0IjogewogICAgICAib3BlcmF0aW9uIjogInNoaWZ0IiwKICAgICAgInNwZWMiOiB7CiAgICAgICAgIm1ldGhvZCI6ICJtZXRob2QiLAogICAgICAgICJVUkkiOiAiVVJJIiwKICAgICAgICAiaGVhZGVycyI6ICJoZWFkZXJzIiwKICAgICAgICAicGF5bG9hZCI6IHsKICAgICAgICAgICJpbmxpbmVkSnNvbkJvZHkiOiB7CiAgICAgICAgICAgICJ0b3AiOiB7CiAgICAgICAgICAgICAgInRhZ1RvRXhjaXNlIjogewogICAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiIAogICAgICAgICAgICAgIH0sCiAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAiKiI6ICJwYXlsb2FkLmlubGluZWRKc29uQm9keS4mIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIH0sIAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPXNwbGl0KCcvZXh0cmFUaGluZ1RvUmVtb3ZlJyxAKDEsJikpIgogICAgIH0KICB9CiB9LAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPWpvaW4oJycsQCgxLCYpKSIKICAgICB9CiAgfQogfQpdCn1dCg=="

# command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 "
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config '[{\"TypeMappingSanitizationTransformerProvider\":{\"sourceProperties\":{\"version\":{\"major\":7,\"minor\":10}}}}]'"
opensearchtarget:
image: 'opensearchproject/opensearch:2.15.0'
environment:
Expand Down
4 changes: 2 additions & 2 deletions TrafficCapture/trafficReplayer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ transform to add GZIP encoding and another to apply a new header would be config
```

To run only one transformer without any configuration, the `--transformer-config` argument can simply
be set to the name of the transformer (e.g. 'JsonTransformerForOpenSearch23PlusTargetTransformerProvider',
be set to the name of the transformer (e.g. 'TypeMappingSanitizationTransformerProvider',
without quotes or any json surrounding it).

The user can also specify a file to read the transformations from using the `--transformer-config-file`. Users can
also pass the script as an argument via `--transformer-config-base64`. Each of the `transformer-config` options
is mutually exclusive.

Some simple transformations are included to change headers to add compression or to force an HTTP message payload to
be chunked. Another transformer, [JsonTypeMappingTransformer.java](../../transformation/transformationPlugins/jsonMessageTransformers/openSearch23PlusTargetTransformerProvider/src/main/java/org/opensearch/migrations/transform/JsonTypeMappingTransformer.java),
be chunked. Another transformer, [TypeMappingSanitizationTransformer.java](../../transformation/transformationPlugins/jsonMessageTransformers/jsonTypeMappingsSanitizationTransformer/src/main/java/org/opensearch/migrations/transform/TypeMappingsSanitizationTransformer.java),
is a work-in-progress to excise type mapping references from URIs and message payloads since versions of OpenSource
greater than 2.3 do not support them.

Expand Down
5 changes: 3 additions & 2 deletions TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ dependencies {
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJinjavaTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down Expand Up @@ -59,7 +60,7 @@ dependencies {
testImplementation testFixtures(project(path: ':coreUtilities'))
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

testImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5'
testImplementation group: 'org.junit.jupiter', name:'junit-jupiter-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
} else {
content.release();
}
} else if (msg instanceof HttpMessage) {
}
if (msg instanceof HttpMessage) { // this & HttpContent are interfaces & 'Full' messages implement both
message = (HttpMessage) msg;
}
if (msg instanceof LastHttpContent) {
Expand All @@ -206,16 +207,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
var finalMsg = (message instanceof HttpRequest)
? new DefaultFullHttpRequest(message.protocolVersion(),
((HttpRequest) message).method(),
((HttpRequest) message).uri(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders())
((HttpRequest) message).method(),
((HttpRequest) message).uri(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders())
: new DefaultFullHttpResponse(message.protocolVersion(),
((HttpResponse)message).status(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders());
((HttpResponse)message).status(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders());
super.channelRead(ctx, finalMsg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class ParsedHttpMessagesAsDicts {
public static final String STATUS_CODE_KEY = "Status-Code";
public static final String RESPONSE_TIME_MS_KEY = "response_time_ms";
public static final String EXCEPTION_KEY_STRING = "Exception";
public static final String REQUEST_URI_KEY = "Request-URI";
public static final String METHOD_KEY = "Method";
public static final String HTTP_VERSION_KEY = "HTTP-Version";
public static final String PAYLOAD_KEY = "payload";

public final Optional<Map<String, Object>> sourceRequestOp;
public final Optional<Map<String, Object>> sourceResponseOp;
Expand Down Expand Up @@ -183,15 +187,15 @@ private static Map<String, Object> convertRequest(
var message = (HttpJsonRequestWithFaultingPayload) messageHolder.get();
if (message != null) {
var map = new LinkedHashMap<>(message.headers());
map.put("Request-URI", message.path());
map.put("Method", message.method());
map.put("HTTP-Version", message.protocol());
map.put(REQUEST_URI_KEY, message.path());
map.put(METHOD_KEY, message.method());
map.put(HTTP_VERSION_KEY, message.protocol());
context.setMethod(message.method());
context.setEndpoint(message.path());
context.setHttpVersion(message.protocol());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
map.put(PAYLOAD_KEY, message.payload());
}
return map;
} else {
Expand Down Expand Up @@ -223,14 +227,14 @@ private static Map<String, Object> convertResponse(
var message = (HttpJsonResponseWithFaultingPayload) messageHolder.get();
if (message != null) {
var map = new LinkedHashMap<>(message.headers());
map.put("HTTP-Version", message.protocol());
map.put(HTTP_VERSION_KEY, message.protocol());
map.put(STATUS_CODE_KEY, Integer.parseInt(message.code()));
map.put("Reason-Phrase", message.reason());
map.put(RESPONSE_TIME_MS_KEY, latency.toMillis());
context.setHttpVersion(message.protocol());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
map.put(PAYLOAD_KEY, message.payload());
}
return map;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public static String getTransactionSummaryStringPreamble() {
.add("SOURCE_STATUS_CODE/TARGET_STATUS_CODE...")
.add("SOURCE_RESPONSE_SIZE_BYTES/TARGET_RESPONSE_SIZE_BYTES...")
.add("SOURCE_LATENCY_MS/TARGET_LATENCY_MS...")
.add("METHOD...")
.add("URI...")
.toString();
}

Expand Down Expand Up @@ -218,6 +220,16 @@ public static String toTransactionSummaryString(
transformStreamToString(parsed.targetResponseList.stream(),
r -> "" + r.get(ParsedHttpMessagesAsDicts.RESPONSE_TIME_MS_KEY))
)
// method
.add(
parsed.sourceRequestOp
.map(r -> (String) r.get(ParsedHttpMessagesAsDicts.METHOD_KEY))
.orElse(MISSING_STR))
// uri
.add(
parsed.sourceRequestOp
.map(r -> (String) r.get(ParsedHttpMessagesAsDicts.REQUEST_URI_KEY))
.orElse(MISSING_STR))
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ public Object getNextTopLevelObject() throws IOException {
pushCompletedValue(parser.getText());
break;
case VALUE_NUMBER_INT:
pushCompletedValue(parser.getIntValue());
pushCompletedValue(parser.getLongValue());
break;
case VALUE_NUMBER_FLOAT:
pushCompletedValue(parser.getFloatValue());
pushCompletedValue(parser.getDoubleValue());
break;
case NOT_AVAILABLE:
// pipeline stall - need more data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ public class PayloadAccessFaultingMap extends AbstractMap<String, Object> {
@Getter
@Setter
private boolean disableThrowingPayloadNotLoaded;
private boolean payloadWasAccessed;

public PayloadAccessFaultingMap(StrictCaseInsensitiveHttpHeadersMap headers) {
disableThrowingPayloadNotLoaded = true;
underlyingMap = new TreeMap<>();
isJson = Optional.ofNullable(headers.get("content-type"))
.map(list -> list.stream().anyMatch(s -> s.startsWith("application/json")))
Expand All @@ -51,19 +53,19 @@ public Iterator<Map.Entry<String, Object>> iterator() {
return new Iterator<>() {
@Override
public boolean hasNext() {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}

@Override
public Map.Entry<String, Object> next() {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}
};
}

@Override
public int size() {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}
};
} else {
Expand All @@ -80,8 +82,21 @@ public Object put(String key, Object value) {
public Object get(Object key) {
var value = super.get(key);
if (value == null && !disableThrowingPayloadNotLoaded) {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}
return value;
}

public boolean missingPayloadWasAccessed() {
return payloadWasAccessed;
}

public void resetMissingPayloadWasAccessed() {
payloadWasAccessed = false;
}

private PayloadNotLoadedException makeFault() throws PayloadNotLoadedException {
payloadWasAccessed = true;
return PayloadNotLoadedException.getInstance();
}
}
Loading

0 comments on commit 6d3b225

Please sign in to comment.