From 1dccd9f338f2b2a044c72558cab897b904451903 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 7 Oct 2024 10:49:29 +0300 Subject: [PATCH 1/3] Update Pulsar to 3.3.2 --- gradle/libs.versions.toml | 2 +- .../reactive/client/adapter/SingletonPulsarContainer.java | 2 +- scripts/validate_staging_repo.sh | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7f06635..a7935e8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -29,7 +29,7 @@ junit-jupiter = "5.11.0-RC1" licenser = "0.6.1" log4j = "2.23.1" mockito = "5.12.0" -pulsar = "3.3.1" +pulsar = "3.3.2" rat-gradle = "0.8.0" reactor = "3.6.9" slf4j = "2.0.16" diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java index 8337a82..ca27174 100644 --- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java +++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java @@ -45,7 +45,7 @@ static PulsarClient createPulsarClient() throws PulsarClientException { } static DockerImageName getPulsarImage() { - return DockerImageName.parse("apachepulsar/pulsar:3.3.1"); + return DockerImageName.parse("apachepulsar/pulsar:3.3.2"); } } diff --git a/scripts/validate_staging_repo.sh b/scripts/validate_staging_repo.sh index b828928..b5b7753 100755 --- a/scripts/validate_staging_repo.sh +++ b/scripts/validate_staging_repo.sh @@ -40,7 +40,7 @@ if ! command -v gradle &>/dev/null; then fi DOCKER_CONTAINER_NAME=pulsar-standalone-$$ -: ${DOCKER_IMAGE_NAME:=apachepulsar/pulsar:3.3.1} +: ${DOCKER_IMAGE_NAME:=apachepulsar/pulsar:3.3.2} mkdir test-app-reactive-$$ cd test-app-reactive-$$ @@ -90,7 +90,7 @@ public class HelloPulsarClientReactive { public static void main(String[] args) throws PulsarClientException, InterruptedException { // Before running this, start Pulsar within docker with this command: - // docker run -it -p 8080:8080 -p 6650:6650 apachepulsar/pulsar:3.3.1 /pulsar/bin/pulsar standalone -nss -nfw + // docker run -it -p 8080:8080 -p 6650:6650 apachepulsar/pulsar:3.3.2 /pulsar/bin/pulsar standalone -nss -nfw try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) { From fb53ad93fcc793ba3bf8dce19f3450b3446baf6f Mon Sep 17 00:00:00 2001 From: onobc Date: Tue, 15 Oct 2024 14:36:54 -0500 Subject: [PATCH 2/3] Fix compilation in test due to Pulsar 3.3.2 update --- .../internal/adapter/AdaptedReactiveMessageConsumerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTests.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTests.java index 3545d1d..6a4e95d 100644 --- a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTests.java +++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTests.java @@ -78,7 +78,7 @@ void consumerProperties() throws Exception { PulsarClientImpl pulsarClient = spy( (PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build()); doReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata())).when(pulsarClient) - .getPartitionedTopicMetadata(anyString(), anyBoolean()); + .getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean()); Consumer consumer = mock(Consumer.class); doReturn(CompletableFuture.completedFuture(null)).when(consumer).closeAsync(); From 108d7ff89e947a3bcdb7673356ca281d5b5bd3f8 Mon Sep 17 00:00:00 2001 From: onobc Date: Tue, 15 Oct 2024 22:33:49 -0500 Subject: [PATCH 3/3] Replace order of `given/willAnswer` for spied objects The cause of the test hang was that the test was incorrectly setting up the spy on the type message builder impl. In previous Pulsar version of TypedMessageBuilderImpl, the fact that the method sendAsync was being called at mock setup time was not causing an issue. However, in the latest impl it did not like that and was throwing things off. Spied objects should always use the `doReturn|Answer|Throw()` family as described in https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#important-gotcha-on-spying-real-objects--heading --- .../AdaptedReactiveMessageSenderTests.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java index 7ccda26..22a4e22 100644 --- a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java +++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java @@ -77,6 +77,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -195,11 +196,11 @@ void sendOnePulsarException() throws Exception { given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full")); return failed; - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; }); @@ -231,7 +232,7 @@ void sendManyStopOnError() throws Exception { given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { if (entryId.get() == 1) { CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full")); @@ -241,7 +242,7 @@ void sendManyStopOnError() throws Exception { .newMessageId(1, entryId.incrementAndGet(), 1); messageIds.add(messageId); return CompletableFuture.completedFuture(messageId); - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; }); @@ -279,7 +280,7 @@ void sendMany() throws Exception { given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { if (entryId.get() == 2) { CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full")); @@ -289,7 +290,7 @@ void sendMany() throws Exception { .newMessageId(1, entryId.incrementAndGet(), 1); messageIds.add(messageId); return CompletableFuture.completedFuture(messageId); - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; }); @@ -498,7 +499,7 @@ void doTestMaxInFlight(BiFunction, Flux, given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { CompletableFuture messageSender = new CompletableFuture<>(); finalExecutorService.execute(() -> { long current = totalRequests.incrementAndGet(); @@ -512,7 +513,7 @@ void doTestMaxInFlight(BiFunction, Flux, DefaultImplementation.getDefaultImplementation().newMessageId(1, encodedEntryId, 1)); }, 100, TimeUnit.MILLISECONDS); return messageSender; - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; });