Skip to content

Commit

Permalink
Fall back to metadata command for RabbitMQ < 3.11
Browse files Browse the repository at this point in the history
For Environment#streamExists(String).

References #370
  • Loading branch information
acogoluegnes committed Jun 29, 2023
1 parent a40bd43 commit 59916b8
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 45 deletions.
5 changes: 1 addition & 4 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,7 @@ public void initChannel(SocketChannel ch) {
tuneState.getHeartbeat());
this.connectionProperties = open(parameters.virtualHost);
Set<FrameHandlerInfo> supportedCommands = maybeExchangeCommandVersions();
if (supportedCommands.stream()
.filter(i -> i.getKey() == COMMAND_STREAM_STATS)
.findAny()
.isPresent()) {
if (supportedCommands.stream().anyMatch(i -> i.getKey() == COMMAND_STREAM_STATS)) {
this.exchangeCommandVersionsCheck = () -> {};
} else {
this.exchangeCommandVersionsCheck =
Expand Down
52 changes: 15 additions & 37 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,37 +89,6 @@ class StreamEnvironment implements Environment {
private final List<Locator> locators = new CopyOnWriteArrayList<>();
private final ExecutorServiceFactory executorServiceFactory;

StreamEnvironment(
ScheduledExecutorService scheduledExecutorService,
Client.ClientParameters clientParametersPrototype,
List<URI> uris,
BackOffDelayPolicy recoveryBackOffDelayPolicy,
BackOffDelayPolicy topologyBackOffDelayPolicy,
AddressResolver addressResolver,
int maxProducersByConnection,
int maxTrackingConsumersByConnection,
int maxConsumersByConnection,
DefaultTlsConfiguration tlsConfiguration,
ByteBufAllocator byteBufAllocator,
boolean lazyInit,
Function<ClientConnectionType, String> connectionNamingStrategy) {
this(
scheduledExecutorService,
clientParametersPrototype,
uris,
recoveryBackOffDelayPolicy,
topologyBackOffDelayPolicy,
addressResolver,
maxProducersByConnection,
maxTrackingConsumersByConnection,
maxConsumersByConnection,
tlsConfiguration,
byteBufAllocator,
lazyInit,
connectionNamingStrategy,
cp -> new Client(cp));
}

StreamEnvironment(
ScheduledExecutorService scheduledExecutorService,
Client.ClientParameters clientParametersPrototype,
Expand Down Expand Up @@ -513,22 +482,31 @@ public StreamStats queryStreamStats(String stream) {
public boolean streamExists(String stream) {
checkNotClosed();
this.maybeInitializeLocator();
StreamStatsResponse response =
short responseCode =
locatorOperation(
Utils.namedFunction(
client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream));
if (response.isOk()) {
client -> {
try {
return client.streamStats(stream).getResponseCode();
} catch (UnsupportedOperationException e) {
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
return metadata.get(stream).getResponseCode();
}
},
"Stream exists for stream '%s'",
stream));
if (responseCode == Constants.RESPONSE_CODE_OK) {
return true;
} else if (response.getResponseCode() == Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST) {
} else if (responseCode == Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST) {
return false;
} else {
throw convertCodeToException(
response.getResponseCode(),
responseCode,
stream,
() ->
format(
"Unexpected result when checking if stream '%s' exists: %s.",
stream, formatConstant(response.getResponseCode())));
stream, formatConstant(responseCode)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
private CompressionCodecFactory compressionCodecFactory;
private boolean lazyInit = false;
private Function<ClientConnectionType, String> connectionNamingStrategy;
private Function<Client.ClientParameters, Client> clientFactory = Client::new;

public StreamEnvironmentBuilder() {}

Expand Down Expand Up @@ -302,6 +302,11 @@ public NettyConfiguration netty() {
return this.netty;
}

StreamEnvironmentBuilder clientFactory(Function<Client.ClientParameters, Client> clientFactory) {
this.clientFactory = clientFactory;
return this;
}

@Override
public Environment build() {
if (this.compressionCodecFactory == null) {
Expand All @@ -310,7 +315,8 @@ public Environment build() {
this.clientParameters.compressionCodecFactory(this.compressionCodecFactory);
}
this.id = this.id == null ? "rabbitmq-stream" : this.id;
this.connectionNamingStrategy = Utils.defaultConnectionNamingStrategy(this.id + "-");
Function<ClientConnectionType, String> connectionNamingStrategy =
Utils.defaultConnectionNamingStrategy(this.id + "-");
this.clientParameters.eventLoopGroup(this.netty.eventLoopGroup);
this.clientParameters.byteBufAllocator(this.netty.byteBufAllocator);
this.clientParameters.channelCustomizer(this.netty.channelCustomizer);
Expand All @@ -328,7 +334,8 @@ public Environment build() {
tls,
netty.byteBufAllocator,
lazyInit,
connectionNamingStrategy);
connectionNamingStrategy,
this.clientFactory);
}

static final class DefaultTlsConfiguration implements TlsConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -599,9 +601,52 @@ void queryStreamStatsShouldThrowExceptionWhenStreamDoesNotExist() {
@ValueSource(booleans = {true, false})
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
void streamExists(boolean lazyInit) {
try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) {
AtomicBoolean metadataCalled = new AtomicBoolean(false);
Function<Client.ClientParameters, Client> clientFactory =
cp ->
new Client(cp) {
@Override
public Map<String, StreamMetadata> metadata(String... streams) {
metadataCalled.set(true);
return super.metadata(streams);
}
};
try (Environment env =
((StreamEnvironmentBuilder) environmentBuilder.lazyInitialization(lazyInit))
.clientFactory(clientFactory)
.build()) {
assertThat(env.streamExists(stream)).isTrue();
assertThat(env.streamExists(UUID.randomUUID().toString())).isFalse();
assertThat(metadataCalled).isFalse();
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
void streamExistsMetadataDataFallback(boolean lazyInit) {
AtomicInteger metadataCallCount = new AtomicInteger(0);
Function<Client.ClientParameters, Client> clientFactory =
cp ->
new Client(cp) {
@Override
StreamStatsResponse streamStats(String stream) {
throw new UnsupportedOperationException();
}

@Override
public Map<String, StreamMetadata> metadata(String... streams) {
metadataCallCount.incrementAndGet();
return super.metadata(streams);
}
};
try (Environment env =
((StreamEnvironmentBuilder) environmentBuilder.lazyInitialization(lazyInit))
.clientFactory(clientFactory)
.build()) {
assertThat(env.streamExists(stream)).isTrue();
assertThat(env.streamExists(UUID.randomUUID().toString())).isFalse();
assertThat(metadataCallCount).hasValue(2);
}
}

Expand Down

0 comments on commit 59916b8

Please sign in to comment.