Skip to content

Commit

Permalink
Flip the default should throw behavior for HttpJsonMessageWithFaultin…
Browse files Browse the repository at this point in the history
…gPayload to opt-in rather than opt-out.

HttpJsonMessageWithFaultingPayload throws by default - so it could be run even if it wasn't within a transform - like from within a LoggingHandler, which is what I was observing when I added some more logging.  The worst part was that it created other errors which then caused the message to be processed in a very different way.

Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn committed Dec 4, 2024
1 parent 41e0b5d commit 5c3cb8a
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class PayloadAccessFaultingMap extends AbstractMap<String, Object> {
private boolean disableThrowingPayloadNotLoaded;

public PayloadAccessFaultingMap(StrictCaseInsensitiveHttpHeadersMap headers) {
disableThrowingPayloadNotLoaded = true;
underlyingMap = new TreeMap<>();
isJson = Optional.ofNullable(headers.get("content-type"))
.map(list -> list.stream().anyMatch(s -> s.startsWith("application/json")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
IAuthTransformer authTransformer = requestPipelineOrchestrator.authTransfomerFactory.getAuthTransformer(
httpJsonMessage
);
final var payloadMap = (PayloadAccessFaultingMap) httpJsonMessage.payload();
try {
payloadMap.setDisableThrowingPayloadNotLoaded(false);
handlePayloadNeutralTransformationOrThrow(
ctx,
originalHttpJsonMessage,
Expand All @@ -86,6 +88,8 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
getAuthTransformerAsStreamingTransformer(authTransformer)
);
ctx.fireChannelRead(handleAuthHeaders(httpJsonMessage, authTransformer));
} finally {
payloadMap.setDisableThrowingPayloadNotLoaded(true);
}
} else if (msg instanceof HttpContent) {
ctx.fireChannelRead(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.opensearch.migrations.replay.datahandlers.JsonAccumulator;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
Expand All @@ -24,6 +27,7 @@
import io.netty.util.ReferenceCountUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.event.Level;

/**
* This accumulates HttpContent messages through a JsonAccumulator and eventually fires off a
Expand Down Expand Up @@ -93,7 +97,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}
} catch (JacksonException e) {
log.atInfo().setCause(e).setMessage("Error parsing json body. " +
log.atLevel(hasRequestContentTypeMatching(capturedHttpJsonMessage, v -> v.startsWith("application/json"))
? Level.INFO : Level.TRACE)
.setCause(e).setMessage("Error parsing json body. " +
"Will pass all payload bytes directly as a ByteBuf within the payload map").log();
jsonWasInvalid = true;
parsedJsonObjects.clear();
Expand Down Expand Up @@ -123,7 +129,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

var leftoverBody = accumulatedBody.slice(jsonBodyByteLength,
accumulatedBody.readableBytes() - jsonBodyByteLength);
if (jsonBodyByteLength == 0 && isRequestContentTypeNotText(capturedHttpJsonMessage)) {
if (jsonBodyByteLength == 0 &&
hasRequestContentTypeMatching(capturedHttpJsonMessage, v -> !v.startsWith("text/")))
{
context.onPayloadSetBinary();
capturedHttpJsonMessage.payload()
.put(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY,
Expand Down Expand Up @@ -157,12 +165,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}

private boolean isRequestContentTypeNotText(HttpJsonMessageWithFaultingPayload message) {
private boolean hasRequestContentTypeMatching(HttpJsonMessageWithFaultingPayload message,
Predicate<String> cnotentTypeFilter) {
// ContentType not text if specified and has a value with / and that value does not start with text/
return Optional.ofNullable(capturedHttpJsonMessage.headers().insensitiveGet(HttpHeaderNames.CONTENT_TYPE.toString()))
.map(s -> s.stream()
.filter(v -> v.contains("/"))
.filter(v -> !v.startsWith("text/"))
.filter(cnotentTypeFilter)
.count() > 1
)
.orElse(false);
Expand Down

0 comments on commit 5c3cb8a

Please sign in to comment.