Skip to content

Commit

Permalink
Another round of changes eliminate a number of code smells.
Browse files Browse the repository at this point in the history
There were some actual bugs uncovered once stricter type matching was turned on in a couple places, though in unit tests.  All have been rectified.

Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn committed Nov 11, 2023
1 parent 465df6c commit 336aa1e
Show file tree
Hide file tree
Showing 36 changed files with 126 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ public static Instant instantFromProtoTimestamp(Timestamp timestampProto) {

public static Optional<Instant> getFirstTimestamp(TrafficStream ts) {
var substream = ts.getSubStreamList();
return substream != null && substream.size() > 0 ?
return substream != null && !substream.isEmpty() ?
Optional.of(instantFromProtoTimestamp(substream.get(0).getTs())) :
Optional.empty();
}

public static Optional<Instant> getLastTimestamp(TrafficStream ts) {
var substream = ts.getSubStreamList();
return substream != null && substream.size() > 0 ?
return substream != null && !substream.isEmpty() ?
Optional.of(instantFromProtoTimestamp(substream.get(substream.size()-1).getTs())) :
Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ private static DecoderResult getDecoderResult(Object obj) {
}
}

public static HttpProcessedState addRelevantHttpMessageIndicatorEvents(
IChannelConnectionCaptureSerializer trafficOffloader,
public static <T> HttpProcessedState addRelevantHttpMessageIndicatorEvents(
IChannelConnectionCaptureSerializer<T> trafficOffloader,
List<Object> parsedMsgs) throws IOException {
Instant timestamp = Instant.now();
for (var obj : parsedMsgs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public LoggingHttpRequestHandler(IChannelConnectionCaptureSerializer<T> trafficO
);
}

private HttpCaptureSerializerUtil.HttpProcessedState parseHttpMessageParts(ByteBuf msg) throws Exception {
private HttpCaptureSerializerUtil.HttpProcessedState parseHttpMessageParts(ByteBuf msg) {
httpDecoderChannel.writeInbound(msg); // Consume this outright, up to the caller to know what else to do
return getHandlerThatHoldsParsedHttpRequest().isDone ?
HttpCaptureSerializerUtil.HttpProcessedState.FULL_MESSAGE :
Expand All @@ -93,11 +93,6 @@ private SimpleDecodedHttpRequestHandler getHandlerThatHoldsParsedHttpRequest() {
return (SimpleDecodedHttpRequestHandler) httpDecoderChannel.pipeline().last();
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
trafficOffloader.addCloseEvent(Instant.now());
Expand Down Expand Up @@ -130,16 +125,6 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}

protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, HttpRequest httpRequest) throws Exception {
super.channelRead(ctx, msg);
metricsLogger.atSuccess()
Expand Down Expand Up @@ -177,21 +162,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
trafficOffloader.addExceptionCaughtEvent(Instant.now(), cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
import java.util.List;

@Slf4j
public class LoggingHttpResponseHandler extends ChannelOutboundHandlerAdapter {
public class LoggingHttpResponseHandler<T> extends ChannelOutboundHandlerAdapter {

private final IChannelConnectionCaptureSerializer trafficOffloader;
private final IChannelConnectionCaptureSerializer<T> trafficOffloader;
private static final MetricsLogger metricsLogger = new MetricsLogger("LoggingHttpResponseHandler");


public LoggingHttpResponseHandler(IChannelConnectionCaptureSerializer trafficOffloader) {
public LoggingHttpResponseHandler(IChannelConnectionCaptureSerializer<T> trafficOffloader) {

Check warning on line 23 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpResponseHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpResponseHandler.java#L23

Added line #L23 was not covered by tests
this.trafficOffloader = trafficOffloader;
}

Expand Down Expand Up @@ -73,11 +73,6 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
super.flush(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
trafficOffloader.addExceptionCaughtEvent(Instant.now(), cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

public class PassThruHttpHeaders extends DefaultHttpHeaders {

private static DefaultHttpHeaders HEADERS_TO_PRESERVE = makeHeadersToPreserve();
private static final DefaultHttpHeaders HEADERS_TO_PRESERVE = makeHeadersToPreserve();

private static DefaultHttpHeaders makeHeadersToPreserve() {
var h = new DefaultHttpHeaders(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.opensearch.migrations.transform;

import java.util.Optional;

public interface IJsonTransformerProvider {
/**
* Create a new transformer from the given configuration. This transformer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.opensearch.migrations.transform;

public class JsonKeysForHttpMessage {

private JsonKeysForHttpMessage() {}

public static final String HTTP_MESSAGE_SCHEMA_VERSION_KEY = "transformerMessageVersion";
public static final String METHOD_KEY = "method";
public static final String URI_KEY = "URI";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.opensearch.migrations.transform;

import java.util.Optional;

public class JsonTransformerForOpenSearch23PlusTargetTransformerProvider implements IJsonTransformerProvider {
@Override
public IJsonTransformer createTransformer(Object jsonConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lombok.Lombok;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -113,16 +114,14 @@ public static void main(String[] args) {
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties);
try {
try (KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties)) {

Check warning on line 117 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java#L117

Added line #L117 was not covered by tests
consumer.subscribe(Collections.singleton(topic));
pipeRecordsToProtoBufDelimited(consumer, getDelimitedProtoBufOutputter(System.out));
} catch (WakeupException e) {
log.info("Wake up exception!");
} catch (Exception e) {
log.error("Unexpected exception", e);
} finally {
consumer.close();
log.info("This consumer close successfully.");
}
}
Expand All @@ -138,7 +137,7 @@ static void pipeRecordsToProtoBufDelimited(
static void processNextChunkOfKafkaEvents(Consumer<String, byte[]> kafkaConsumer, java.util.function.Consumer<Stream<byte[]>> binaryReceiver) {
var records = kafkaConsumer.poll(CONSUMER_POLL_TIMEOUT);
binaryReceiver.accept(StreamSupport.stream(records.spliterator(), false)
.map(cr->cr.value()));
.map(ConsumerRecord::value));

Check warning on line 140 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java#L140

Added line #L140 was not covered by tests
}

static java.util.function.Consumer<Stream<byte[]>> getDelimitedProtoBufOutputter(OutputStream outputStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@

@Slf4j
public class SigV4Signer extends IAuthTransformer.StreamingFullMessageTransformer {
private final static HashSet<String> AUTH_HEADERS_TO_PULL_WITH_PAYLOAD;
private final static HashSet<String> AUTH_HEADERS_TO_PULL_NO_PAYLOAD;
private static final HashSet<String> AUTH_HEADERS_TO_PULL_WITH_PAYLOAD;
private static final HashSet<String> AUTH_HEADERS_TO_PULL_NO_PAYLOAD;

public static final String AMZ_CONTENT_SHA_256 = "x-amz-content-sha256";

Expand Down Expand Up @@ -88,6 +88,7 @@ public void finalizeSignature(HttpJsonMessageWithFaultingPayload msg) {
}

private static class AwsSignerWithPrecomputedContentHash extends BaseAws4Signer {
@Override
protected String calculateContentHash(SdkHttpFullRequest.Builder mutableRequest,
Aws4SignerParams signerParams,
SdkChecksum contentFlexibleChecksum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private JSONObject toJSONObject(SourceTargetCaptureTuple tuple) {
.ifPresent(d -> meta.put("sourceRequest", jsonFromHttpData(d)));
Optional.ofNullable(p.responseData).flatMap(d -> Optional.ofNullable(d.packetBytes))
.ifPresent(d -> meta.put("sourceResponse", jsonFromHttpData(d,
//log.warn("TODO: These durations are not measuring the same values!");
// TODO: These durations are not measuring the same values!
Duration.between(tuple.sourcePair.requestData.getLastPacketTimestamp(),
tuple.sourcePair.responseData.getLastPacketTimestamp()))));
});
Expand Down Expand Up @@ -176,7 +176,7 @@ private JSONObject toJSONObject(SourceTargetCaptureTuple tuple) {
public void accept(SourceTargetCaptureTuple triple) {
JSONObject jsonObject = toJSONObject(triple);

tupleLogger.info(jsonObject.toString());
tupleLogger.info(()->jsonObject.toString());
outputStream.write((jsonObject.toString()+"\n").getBytes(StandardCharsets.UTF_8));
outputStream.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,21 @@ public IJsonTransformer getTransformerFactoryLoader(String newHostName, String f
var loadedTransformers = getTransformerFactoryFromServiceLoader(fullConfig);
return new JsonCompositeTransformer(Stream.concat(
loadedTransformers,
Optional.ofNullable(newHostName).map(h->Stream.of(new HostTransformer(h))).orElse(Stream.of())
Optional.ofNullable(newHostName).stream().map(HostTransformer::new)
).toArray(IJsonTransformer[]::new));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not parse the transformer configuration as a json list", e);
}
}

private class HostTransformer implements IJsonTransformer {
private static class HostTransformer implements IJsonTransformer {
private final String newHostName;

@Override
public Map<String,Object> transformJson(Map<String,Object> incomingJson) {
var asMap = (Map<String, Object>) incomingJson;
var headers = (Map<String, Object>) asMap.get(JsonKeysForHttpMessage.HEADERS_KEY);
var headers = (Map<String, Object>) incomingJson.get(JsonKeysForHttpMessage.HEADERS_KEY);
headers.replace("host", newHostName);
return asMap;
return incomingJson;
}

public HostTransformer(String newHostName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static String packetsToCompressedTrafficStream(Stream<byte[]> byteArrStre
}
}

public TrafficStream trafficStreamFromCompressedString(String encodedAndZippedStr) throws Exception {
public TrafficStream trafficStreamFromCompressedString(String encodedAndZippedStr) throws IOException {
try (var bais = new ByteArrayInputStream(Base64.getDecoder().decode(encodedAndZippedStr))) {
try (var gzis = new GZIPInputStream(bais)) {
return TrafficStream.parseDelimitedFrom(gzis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ public ByteBuf recycleByteBufRetained() {
}
}

private JsonGenerator jsonGenerator;
private ChunkingByteBufOutputStream outputStream;
private ObjectMapper objectMapper;
private Deque<LevelContext> cursorStack;
private final JsonGenerator jsonGenerator;
private final ChunkingByteBufOutputStream outputStream;
private final ObjectMapper objectMapper;
private final Deque<LevelContext<? extends Object>> cursorStack;

@SneakyThrows
public JsonEmitter(ByteBufAllocator byteBufAllocator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* the paylaod (unzip, parse, etc). If a transform DOES require the payload to be present, get()
*
*/
@EqualsAndHashCode
@EqualsAndHashCode(callSuper = false)
@Slf4j
public class PayloadAccessFaultingMap extends AbstractMap<String, Object> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* This is a kludge to provide that. Note that this code doesn't do conversions such as joining
* or splitting. If more control is required, callers should use the multimap interfaces.
*/
@EqualsAndHashCode
@EqualsAndHashCode(callSuper = false)
public class ListKeyAdaptingCaseInsensitiveHeadersMap extends AbstractMap<String,Object> {
protected final StrictCaseInsensitiveHttpHeadersMap strictHeadersMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private static HttpJsonMessageWithFaultingPayload transform(IJsonTransformer tra
var returnedObject = transformer.transformJson(httpJsonMessage);
if (returnedObject != httpJsonMessage) {
httpJsonMessage.clear();
httpJsonMessage = new HttpJsonMessageWithFaultingPayload((Map<String,Object>)returnedObject);
httpJsonMessage = new HttpJsonMessageWithFaultingPayload(returnedObject);

Check warning on line 93 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyDecodedHttpRequestPreliminaryConvertHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyDecodedHttpRequestPreliminaryConvertHandler.java#L93

Added line #L93 was not covered by tests
}
return httpJsonMessage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.opensearch.migrations.transform.IJsonTransformer;

import java.util.Map;

public class NettyJsonBodyConvertHandler extends ChannelInboundHandlerAdapter {
private final IJsonTransformer transformer;

Expand All @@ -17,7 +15,7 @@ public NettyJsonBodyConvertHandler(IJsonTransformer transformer) {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpJsonMessageWithFaultingPayload) {
var output = transformer.transformJson((HttpJsonMessageWithFaultingPayload)msg);
var newHttpJson = new HttpJsonMessageWithFaultingPayload(((Map<String,Object>)output));
var newHttpJson = new HttpJsonMessageWithFaultingPayload(output);

Check warning on line 18 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyConvertHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyConvertHandler.java#L18

Added line #L18 was not covered by tests
ctx.fireChannelRead(newHttpJson);
} else {
super.channelRead(ctx, msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,9 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
"expected in progress Boolean to be not null since null should signal that work was never started";
var transformationStatus = v1.booleanValue() ?
HttpRequestTransformationStatus.COMPLETED : HttpRequestTransformationStatus.ERROR;
return packetReceiver.finalizeRequest().getDeferredFutureThroughHandle((v2, t2) -> {
if (t1 != null) {
return StringTrackableCompletableFuture.<TransformedOutputAndResult<R>>failedFuture(t1,
()->"fixed failure from currentFuture.getDeferredFutureThroughHandle()");
} else if (t2 != null) {
return StringTrackableCompletableFuture.<TransformedOutputAndResult<R>>failedFuture(t2,
()->"fixed failure from packetReceiver.finalizeRequest()");
} else {
return StringTrackableCompletableFuture.completedFuture(Optional.ofNullable(v2)
.map(r-> new TransformedOutputAndResult<R>(r, transformationStatus,
null))
.orElse(null),
()->"fixed value from packetReceiver.finalizeRequest()"
);
}
},
return packetReceiver.finalizeRequest().getDeferredFutureThroughHandle((v2, t2) ->
wrapFinalizedResultWithExceptionHandling(t1, v2, t2,
transformationStatus),
()->"handlerRemoved: NettySendByteBufsToPacketHandlerHandler is setting the completed value for its " +
"packetReceiverCompletionFuture, after the packets have been finalized " +
"to the packetReceiver");
Expand All @@ -96,6 +83,25 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}

private static <R> StringTrackableCompletableFuture<TransformedOutputAndResult<R>>
wrapFinalizedResultWithExceptionHandling(Throwable t1, R v2, Throwable t2,
HttpRequestTransformationStatus transformationStatus) {
if (t1 != null) {
return StringTrackableCompletableFuture.<TransformedOutputAndResult<R>>failedFuture(t1,
() -> "fixed failure from currentFuture.getDeferredFutureThroughHandle()");

Check warning on line 91 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettySendByteBufsToPacketHandlerHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettySendByteBufsToPacketHandlerHandler.java#L90-L91

Added lines #L90 - L91 were not covered by tests
} else if (t2 != null) {
return StringTrackableCompletableFuture.<TransformedOutputAndResult<R>>failedFuture(t2,
() -> "fixed failure from packetReceiver.finalizeRequest()");

Check warning on line 94 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettySendByteBufsToPacketHandlerHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettySendByteBufsToPacketHandlerHandler.java#L93-L94

Added lines #L93 - L94 were not covered by tests
} else {
return StringTrackableCompletableFuture.completedFuture(Optional.ofNullable(v2)
.map(r -> new TransformedOutputAndResult<R>(r, transformationStatus,
null))
.orElse(null),
() -> "fixed value from packetReceiver.finalizeRequest()"

Check warning on line 100 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettySendByteBufsToPacketHandlerHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettySendByteBufsToPacketHandlerHandler.java#L100

Added line #L100 was not covered by tests
);
}
}

public DiagnosticTrackableCompletableFuture<String, TransformedOutputAndResult<R>>
getPacketReceiverCompletionFuture() {
assert packetReceiverCompletionFutureRef.get() != null :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* so that we can maintain that original order. However, we add an extra ability to keep key values
* (or header names) case insensitive.
*/
@EqualsAndHashCode
@EqualsAndHashCode(callSuper = false)
public class StrictCaseInsensitiveHttpHeadersMap extends AbstractMap<String,List<String>> {
protected LinkedHashMap<String, SimpleEntry<String, List<String>>> lowerCaseToUpperCaseAndValueMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils;

import java.util.StringJoiner;

@ToString
public class PojoTrafficStreamKey implements ITrafficStreamKey {
private final String nodeId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

import java.util.ArrayList;

@EqualsAndHashCode
@EqualsAndHashCode(callSuper = true)
public class RawPackets extends ArrayList<byte[]> {
}
Loading

0 comments on commit 336aa1e

Please sign in to comment.