Skip to content

Commit

Permalink
Use unchecked exception on reader factory
Browse files Browse the repository at this point in the history
This commit replaces the checked `PulsarClientException` with the
unchecked `PulsarException` on the `PulsarReaderFactory#createReader`
API.
  • Loading branch information
onobc committed Sep 12, 2024
1 parent cfe597d commit 80fa21d
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ The `PulsarTopic` constructor now requires a fully qualified topic name (`domain
If you are invoking the constructor you will need to be sure the topic you pass in is fully-qualified.
A better alternative is to instead use the `PulsarTopicBuilder` as it does not require fully qualified names and will add default values for the missing components in the specified name.

==== PulsarReaderFactory#createReader
The `PulsarReaderFactory#createReader` API now throws an unchecked `PulsarException` rather than a checked `PulsarClientException`.
Replace any `try/catch` blocks on this API accordingly.

[[what-s-new-in-1-1-since-1-0]]
== What's New in 1.1 Since 1.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.client.impl.ReaderBuilderImpl;

import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;
import org.springframework.util.CollectionUtils;

/**
Expand Down Expand Up @@ -82,7 +83,7 @@ public void setTopicBuilder(@Nullable PulsarTopicBuilder topicBuilder) {

@Override
public Reader<T> createReader(@Nullable List<String> topics, @Nullable MessageId messageId, Schema<T> schema,
@Nullable List<ReaderBuilderCustomizer<T>> customizers) throws PulsarClientException {
@Nullable List<ReaderBuilderCustomizer<T>> customizers) {
Objects.requireNonNull(schema, "Schema must be specified");
ReaderBuilder<T> readerBuilder = this.pulsarClient.newReader(schema);

Expand All @@ -103,7 +104,12 @@ public Reader<T> createReader(@Nullable List<String> topics, @Nullable MessageId
customizers.forEach(customizer -> customizer.customize(readerBuilder));
}

return readerBuilder.create();
try {
return readerBuilder.create();
}
catch (PulsarClientException ex) {
throw new PulsarException(ex);
}
}

private void replaceTopicsOnBuilder(ReaderBuilder<T> builder, Collection<String> topics) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,12 +24,14 @@
import org.apache.pulsar.client.api.Schema;

import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;

/**
* Pulsar {@link Reader} factory interface.
*
* @param <T> Underlying message type handled by this reader.
* @author Soby Chacko
* @author Chris Bono
*/
public interface PulsarReaderFactory<T> {

Expand All @@ -42,10 +44,11 @@ public interface PulsarReaderFactory<T> {
* @param customizers the optional list of customizers to apply to the reader builder.
* Note that the customizers are applied last and have the potential for overriding
* any specified parameters or default properties.
* @return Pulsar {@link Reader}
* @throws PulsarClientException if there are issues when creating the reader
* @return the created reader
* @throws PulsarException if any {@link PulsarClientException} occurs communicating
* with Pulsar
*/
Reader<T> createReader(@Nullable List<String> topics, @Nullable MessageId messageId, Schema<T> schema,
@Nullable List<ReaderBuilderCustomizer<T>> customizers) throws PulsarClientException;
@Nullable List<ReaderBuilderCustomizer<T>> customizers);

}
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,9 @@ private final class InternalAsyncReader implements SchedulingAwareRunnable {
this.readerBuilderCustomizer = getReaderBuilderCustomizer();
List<ReaderBuilderCustomizer<T>> customizers = this.readerBuilderCustomizer != null
? List.of(this.readerBuilderCustomizer) : Collections.emptyList();
try {
this.reader = getPulsarReaderFactory().createReader(readerContainerProperties.getTopics(),
readerContainerProperties.getStartMessageId(), (Schema) readerContainerProperties.getSchema(),
customizers);
}
catch (PulsarClientException ex) {
// TODO remove when PRF.createReader replaces PCEX w PEX
throw new PulsarException(ex);
}
this.reader = getPulsarReaderFactory().createReader(readerContainerProperties.getTopics(),
readerContainerProperties.getStartMessageId(), (Schema) readerContainerProperties.getSchema(),
customizers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;

/**
Expand Down Expand Up @@ -272,15 +273,15 @@ void missingTopic() {
// topic name is not set in the API call or in the reader config.
assertThatThrownBy(() -> pulsarReaderFactory.createReader(Collections.emptyList(), MessageId.earliest,
Schema.STRING, Collections.emptyList()))
.isInstanceOf(PulsarClientException.class)
.isInstanceOf(PulsarException.class)
.hasMessageContaining("Topic name must be set on the reader builder");
}

@Test
void missingStartingMessageId() {
assertThatThrownBy(() -> pulsarReaderFactory.createReader(List.of("my-reader-topic"), null, Schema.STRING,
Collections.emptyList()))
.isInstanceOf(PulsarClientException.class)
.isInstanceOf(PulsarException.class)
.hasMessageContaining(
"Start message id or start message from roll back must be specified but they cannot be specified at the same time");
}
Expand Down

0 comments on commit 80fa21d

Please sign in to comment.