Skip to content

Commit

Permalink
Replacing InboundMessage with NativeInboundMessage for deprecation (#…
Browse files Browse the repository at this point in the history
…13126)

* Replacing InboundMessage with NativeInboundMessage for deprecation

Signed-off-by: Vacha Shah <vachshah@amazon.com>

* Removing InboundMessage class

Signed-off-by: Vacha Shah <vachshah@amazon.com>

---------

Signed-off-by: Vacha Shah <vachshah@amazon.com>
  • Loading branch information
VachaShah authored Apr 18, 2024
1 parent ba25c23 commit f5c3ef9
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -113,7 +114,7 @@ public void aggregate(ReleasableBytesReference content) {
}
}

public InboundMessage finishAggregation() throws IOException {
public NativeInboundMessage finishAggregation() throws IOException {
ensureOpen();
final ReleasableBytesReference releasableContent;
if (isFirstContent()) {
Expand All @@ -127,7 +128,7 @@ public InboundMessage finishAggregation() throws IOException {
}

final BreakerControl breakerControl = new BreakerControl(circuitBreaker);
final InboundMessage aggregated = new InboundMessage(currentHeader, releasableContent, breakerControl);
final NativeInboundMessage aggregated = new NativeInboundMessage(currentHeader, releasableContent, breakerControl);
boolean success = false;
try {
if (aggregated.getHeader().needsToReadVariableHeader()) {
Expand All @@ -142,7 +143,7 @@ public InboundMessage finishAggregation() throws IOException {
if (isShortCircuited()) {
aggregated.close();
success = true;
return new InboundMessage(aggregated.getHeader(), aggregationException);
return new NativeInboundMessage(aggregated.getHeader(), aggregationException);
} else {
success = true;
return aggregated;
Expand Down
108 changes: 0 additions & 108 deletions server/src/main/java/org/opensearch/transport/InboundMessage.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void messageReceived(
long slowLogThresholdMs,
TransportMessageListener messageListener
) throws IOException {
InboundMessage inboundMessage = (InboundMessage) message;
NativeInboundMessage inboundMessage = (NativeInboundMessage) message;
TransportLogger.logInboundMessage(channel, inboundMessage);
if (inboundMessage.isPing()) {
keepAlive.receiveKeepAlive(channel);
Expand All @@ -122,7 +123,7 @@ public void messageReceived(

private void handleMessage(
TcpChannel channel,
InboundMessage message,
NativeInboundMessage message,
long startTime,
long slowLogThresholdMs,
TransportMessageListener messageListener
Expand Down Expand Up @@ -194,7 +195,7 @@ private Map<String, Collection<String>> extractHeaders(Map<String, String> heade
private <T extends TransportRequest> void handleRequest(
TcpChannel channel,
Header header,
InboundMessage message,
NativeInboundMessage message,
TransportMessageListener messageListener
) throws IOException {
final String action = header.getActionName();
Expand Down
12 changes: 0 additions & 12 deletions server/src/main/java/org/opensearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -761,18 +761,6 @@ protected void serverAcceptedChannel(TcpChannel channel) {
*/
protected abstract void stopInternal();

/**
* @deprecated use {@link #inboundMessage(TcpChannel, ProtocolInboundMessage)}
* Handles inbound message that has been decoded.
*
* @param channel the channel the message is from
* @param message the message
*/
@Deprecated(since = "2.14.0", forRemoval = true)
public void inboundMessage(TcpChannel channel, InboundMessage message) {
inboundMessage(channel, (ProtocolInboundMessage) message);
}

/**
* Handles inbound message that has been decoded.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.IOException;

Expand All @@ -64,7 +65,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) {
}
}

static void logInboundMessage(TcpChannel channel, InboundMessage message) {
static void logInboundMessage(TcpChannel channel, NativeInboundMessage message) {
if (logger.isTraceEnabled()) {
try {
String logMessage = format(channel, message, "READ");
Expand Down Expand Up @@ -136,7 +137,7 @@ private static String format(TcpChannel channel, BytesReference message, String
return sb.toString();
}

private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException {
private static String format(TcpChannel channel, NativeInboundMessage message, String event) throws IOException {
final StringBuilder sb = new StringBuilder();
sb.append(channel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.transport.InboundAggregator;
import org.opensearch.transport.InboundBytesHandler;
import org.opensearch.transport.InboundDecoder;
import org.opensearch.transport.InboundMessage;
import org.opensearch.transport.ProtocolInboundMessage;
import org.opensearch.transport.StatsTracker;
import org.opensearch.transport.TcpChannel;
Expand All @@ -32,7 +31,7 @@
public class NativeInboundBytesHandler implements InboundBytesHandler {

private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new);
private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true);
private static final NativeInboundMessage PING_MESSAGE = new NativeInboundMessage(null, true);

private final ArrayDeque<ReleasableBytesReference> pending;
private final InboundDecoder decoder;
Expand Down Expand Up @@ -152,7 +151,7 @@ private void forwardFragments(
messageHandler.accept(channel, PING_MESSAGE);
} else if (fragment == InboundDecoder.END_CONTENT) {
assert aggregator.isAggregating();
try (InboundMessage aggregated = aggregator.finishAggregation()) {
try (NativeInboundMessage aggregated = aggregator.finishAggregation()) {
statsTracker.markMessageReceived();
messageHandler.accept(channel, aggregated);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void testInboundAggregation() throws IOException {
}

// Signal EOS
InboundMessage aggregated = aggregator.finishAggregation();
NativeInboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertFalse(aggregated.isPing());
Expand Down Expand Up @@ -138,7 +139,7 @@ public void testInboundUnknownAction() throws IOException {
assertEquals(0, content.refCount());

// Signal EOS
InboundMessage aggregated = aggregator.finishAggregation();
NativeInboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertTrue(aggregated.isShortCircuit());
Expand All @@ -161,7 +162,7 @@ public void testCircuitBreak() throws IOException {
content1.close();

// Signal EOS
InboundMessage aggregated1 = aggregator.finishAggregation();
NativeInboundMessage aggregated1 = aggregator.finishAggregation();

assertEquals(0, content1.refCount());
assertThat(aggregated1, notNullValue());
Expand All @@ -180,7 +181,7 @@ public void testCircuitBreak() throws IOException {
content2.close();

// Signal EOS
InboundMessage aggregated2 = aggregator.finishAggregation();
NativeInboundMessage aggregated2 = aggregator.finishAggregation();

assertEquals(1, content2.refCount());
assertThat(aggregated2, notNullValue());
Expand All @@ -199,7 +200,7 @@ public void testCircuitBreak() throws IOException {
content3.close();

// Signal EOS
InboundMessage aggregated3 = aggregator.finishAggregation();
NativeInboundMessage aggregated3 = aggregator.finishAggregation();

assertEquals(1, content3.refCount());
assertThat(aggregated3, notNullValue());
Expand Down Expand Up @@ -263,7 +264,7 @@ public void testFinishAggregationWillFinishHeader() throws IOException {
content.close();

// Signal EOS
InboundMessage aggregated = aggregator.finishAggregation();
NativeInboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertFalse(header.needsToReadVariableHeader());
Expand Down
Loading

0 comments on commit f5c3ef9

Please sign in to comment.