diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7f06635..a7935e8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -29,7 +29,7 @@ junit-jupiter = "5.11.0-RC1" licenser = "0.6.1" log4j = "2.23.1" mockito = "5.12.0" -pulsar = "3.3.1" +pulsar = "3.3.2" rat-gradle = "0.8.0" reactor = "3.6.9" slf4j = "2.0.16" diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java index 8337a82..ca27174 100644 --- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java +++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java @@ -45,7 +45,7 @@ static PulsarClient createPulsarClient() throws PulsarClientException { } static DockerImageName getPulsarImage() { - return DockerImageName.parse("apachepulsar/pulsar:3.3.1"); + return DockerImageName.parse("apachepulsar/pulsar:3.3.2"); } } diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTests.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTests.java index 3545d1d..6a4e95d 100644 --- a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTests.java +++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTests.java @@ -78,7 +78,7 @@ void consumerProperties() throws Exception { PulsarClientImpl pulsarClient = spy( (PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build()); doReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata())).when(pulsarClient) - .getPartitionedTopicMetadata(anyString(), anyBoolean()); + .getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean()); Consumer consumer = mock(Consumer.class); doReturn(CompletableFuture.completedFuture(null)).when(consumer).closeAsync(); diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java index 7ccda26..22a4e22 100644 --- a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java +++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java @@ -77,6 +77,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -195,11 +196,11 @@ void sendOnePulsarException() throws Exception { given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full")); return failed; - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; }); @@ -231,7 +232,7 @@ void sendManyStopOnError() throws Exception { given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { if (entryId.get() == 1) { CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full")); @@ -241,7 +242,7 @@ void sendManyStopOnError() throws Exception { .newMessageId(1, entryId.incrementAndGet(), 1); messageIds.add(messageId); return CompletableFuture.completedFuture(messageId); - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; }); @@ -279,7 +280,7 @@ void sendMany() throws Exception { given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { if (entryId.get() == 2) { CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full")); @@ -289,7 +290,7 @@ void sendMany() throws Exception { .newMessageId(1, entryId.incrementAndGet(), 1); messageIds.add(messageId); return CompletableFuture.completedFuture(messageId); - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; }); @@ -498,7 +499,7 @@ void doTestMaxInFlight(BiFunction, Flux, given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { CompletableFuture messageSender = new CompletableFuture<>(); finalExecutorService.execute(() -> { long current = totalRequests.incrementAndGet(); @@ -512,7 +513,7 @@ void doTestMaxInFlight(BiFunction, Flux, DefaultImplementation.getDefaultImplementation().newMessageId(1, encodedEntryId, 1)); }, 100, TimeUnit.MILLISECONDS); return messageSender; - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; }); diff --git a/scripts/validate_staging_repo.sh b/scripts/validate_staging_repo.sh index b828928..b5b7753 100755 --- a/scripts/validate_staging_repo.sh +++ b/scripts/validate_staging_repo.sh @@ -40,7 +40,7 @@ if ! command -v gradle &>/dev/null; then fi DOCKER_CONTAINER_NAME=pulsar-standalone-$$ -: ${DOCKER_IMAGE_NAME:=apachepulsar/pulsar:3.3.1} +: ${DOCKER_IMAGE_NAME:=apachepulsar/pulsar:3.3.2} mkdir test-app-reactive-$$ cd test-app-reactive-$$ @@ -90,7 +90,7 @@ public class HelloPulsarClientReactive { public static void main(String[] args) throws PulsarClientException, InterruptedException { // Before running this, start Pulsar within docker with this command: - // docker run -it -p 8080:8080 -p 6650:6650 apachepulsar/pulsar:3.3.1 /pulsar/bin/pulsar standalone -nss -nfw + // docker run -it -p 8080:8080 -p 6650:6650 apachepulsar/pulsar:3.3.2 /pulsar/bin/pulsar standalone -nss -nfw try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {