Skip to content

Commit

Permalink
Enable type mappings transform for the replayer in the docker-compose…
Browse files Browse the repository at this point in the history
… file + logging improvements & cleanup.

Most useful logging improvement - add the URI of the source request to the end of the progess log's lines.  E.g

2024-12-09 14:19:34,024: 921, 0242acfffe120008-00000007-00000065-c88ca66f1bf2aa5a-a978504d.16, 2024-12-09T14:19:33.757503Z, 2872/2890, 200/500,500,500,500, 2866/486,486,486,486, 9/3,2,2,3, /_search/scroll
2024-12-09 14:19:41,602: 950, 0242acfffe120008-00000007-00000065-c88ca66f1bf2aa5a-a978504d.17, 2024-12-09T14:19:33.767614Z, 2862/2880, 200/404,404,404,404, 119/125,125,125,125, 10/1,1,1,1, /_search/scroll

Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn committed Dec 9, 2024
1 parent 2b1f3e0 commit 4e59965
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
# command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config "
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config '[{\"TypeMappingSanitizationTransformerProvider\":\"\"}]'"
# command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 "
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config '[{\"TypeMappingSanitizationTransformerProvider\":{\"sourceProperties\":{\"version\":{\"major\":7,\"minor\":10}}}}]'"
opensearchtarget:
image: 'opensearchproject/opensearch:2.15.0'
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class ParsedHttpMessagesAsDicts {
public static final String STATUS_CODE_KEY = "Status-Code";
public static final String RESPONSE_TIME_MS_KEY = "response_time_ms";
public static final String EXCEPTION_KEY_STRING = "Exception";
public static final String REQUEST_URI_KEY = "Request-URI";
public static final String METHOD_KEY = "Method";
public static final String HTTP_VERSION_KEY = "HTTP-Version";
public static final String PAYLOAD_KEY = "payload";

public final Optional<Map<String, Object>> sourceRequestOp;
public final Optional<Map<String, Object>> sourceResponseOp;
Expand Down Expand Up @@ -183,15 +187,15 @@ private static Map<String, Object> convertRequest(
var message = (HttpJsonRequestWithFaultingPayload) messageHolder.get();
if (message != null) {
var map = new LinkedHashMap<>(message.headers());
map.put("Request-URI", message.path());
map.put("Method", message.method());
map.put("HTTP-Version", message.protocol());
map.put(REQUEST_URI_KEY, message.path());
map.put(METHOD_KEY, message.method());
map.put(HTTP_VERSION_KEY, message.protocol());
context.setMethod(message.method());
context.setEndpoint(message.path());
context.setHttpVersion(message.protocol());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
map.put(PAYLOAD_KEY, message.payload());
}
return map;
} else {
Expand Down Expand Up @@ -223,14 +227,14 @@ private static Map<String, Object> convertResponse(
var message = (HttpJsonResponseWithFaultingPayload) messageHolder.get();
if (message != null) {
var map = new LinkedHashMap<>(message.headers());
map.put("HTTP-Version", message.protocol());
map.put(HTTP_VERSION_KEY, message.protocol());
map.put(STATUS_CODE_KEY, Integer.parseInt(message.code()));
map.put("Reason-Phrase", message.reason());
map.put(RESPONSE_TIME_MS_KEY, latency.toMillis());
context.setHttpVersion(message.protocol());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
map.put(PAYLOAD_KEY, message.payload());
}
return map;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public static String getTransactionSummaryStringPreamble() {
.add("SOURCE_STATUS_CODE/TARGET_STATUS_CODE...")
.add("SOURCE_RESPONSE_SIZE_BYTES/TARGET_RESPONSE_SIZE_BYTES...")
.add("SOURCE_LATENCY_MS/TARGET_LATENCY_MS...")
.add("URI...")
.toString();
}

Expand Down Expand Up @@ -218,6 +219,11 @@ public static String toTransactionSummaryString(
transformStreamToString(parsed.targetResponseList.stream(),
r -> "" + r.get(ParsedHttpMessagesAsDicts.RESPONSE_TIME_MS_KEY))
)
// uri
.add(
parsed.sourceRequestOp
.map(r -> (String) r.get(ParsedHttpMessagesAsDicts.REQUEST_URI_KEY))
.orElse(MISSING_STR))
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public JinjavaTransformer(String templateString,
@Override
public Map<String, Object> transformJson(Map<String, Object> incomingJson) {
var resultStr = jinjava.render(templateStr, createContextWithSourceFunction.apply(incomingJson));
log.atInfo().setMessage("output from jinjava... {}").addArgument(resultStr).log();
log.atDebug().setMessage("output from jinjava... {}").addArgument(resultStr).log();
var parsedObj = (Map<String,Object>) objectMapper.readValue(resultStr, LinkedHashMap.class);
return PreservesProcessor.doFinalSubstitutions(incomingJson, parsedObj);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Object filter(Object inputObject, JinjavaInterpreter interpreter, String.
c.get(JinjavaTransformer.REGEX_REPLACEMENT_CONVERSION_PATTERNS)))
.orElse(DEFAULT_REGEX_REPLACE_FILTER)));
var rval = matcher.replaceAll(rewritten);
log.atError().setMessage("replaced value {} with {}").addArgument(input).addArgument(rval).log();
log.atTrace().setMessage("replaced value {} with {}").addArgument(input).addArgument(rval).log();
return rval;
} catch (Exception e) {
throw new RegexReplaceException(e, input, pattern, replacement, rewritten);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public IJsonTransformer createTransformer(Object jsonConfig) {
return new TypeMappingsSanitizationTransformer(
(Map<String, Map<String, String>>) config.get(STATIC_MAPPINGS),
(List<List<String>>) config.get(REGEX_MAPPINGS),
Optional.ofNullable(config.get(SOURCE_PROPERTIES_KEY)).map(jinjavaConfig ->
mapper.convertValue(jinjavaConfig, SourceProperties.class)).orElse(null),
Optional.ofNullable(config.get(SOURCE_PROPERTIES_KEY)).map(m ->
mapper.convertValue(m, SourceProperties.class)).orElse(null),
(Map<String, Object>) config.get(FEATURE_FLAGS),
Optional.ofNullable(config.get(JINJAVA_CONFIG_KEY)).map(jinjavaConfig ->
mapper.convertValue(jinjavaConfig, JinjavaConfig.class)).orElse(null));
Optional.ofNullable(config.get(JINJAVA_CONFIG_KEY)).map(m ->
mapper.convertValue(m, JinjavaConfig.class)).orElse(null));
} catch (ClassCastException e) {
throw new IllegalArgumentException(getConfigUsageStr(), e);
}
Expand Down

0 comments on commit 4e59965

Please sign in to comment.