Skip to content

Commit

Permalink
[feat][client] Enable custom Encrypt Decrypt methods for Reader (#12599)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: shrathore <shrathore@fanatics.com>
Co-authored-by: tison <wander4096@gmail.com>
  • Loading branch information
3 people authored Jun 30, 2023
1 parent 2bede01 commit e360379
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,15 @@ public interface ReaderBuilder<T> extends Cloneable {
*/
ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);

/**
* Sets a {@link MessageCrypto}.
*
* <p>Contains methods to encrypt/decrypt message for End to End Encryption.
*
* @param messageCrypto message Crypto Object
* @return ReaderBuilder instance
*/
ReaderBuilder<T> messageCrypto(MessageCrypto messageCrypto);
/**
* Sets the size of the consumer receive queue.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public void reachedEndOfTopic(Consumer<T> consumer) {
if (readerConfiguration.getCryptoKeyReader() != null) {
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
}

if (readerConfiguration.getMessageCrypto() != null) {
consumerConfiguration.setMessageCrypto(readerConfiguration.getMessageCrypto());
}
if (readerConfiguration.getKeyHashRanges() != null) {
consumerConfiguration.setKeySharedPolicy(
KeySharedPolicy
Expand All @@ -126,7 +130,7 @@ public void reachedEndOfTopic(Consumer<T> consumer) {
ReaderInterceptorUtil.convertToConsumerInterceptors(
this, readerConfiguration.getReaderInterceptorList());
multiTopicsConsumer = new MultiTopicsConsumerImpl<>(client, consumerConfiguration, executorProvider,
consumerFuture, schema, consumerInterceptors, true,
consumerFuture, schema, consumerInterceptors, true,
readerConfiguration.getStartMessageId(),
readerConfiguration.getStartMessageFromRollbackDurationInSec());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
Expand Down Expand Up @@ -173,6 +174,12 @@ public ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action)
return this;
}

@Override
public ReaderBuilder<T> messageCrypto(MessageCrypto messageCrypto) {
conf.setMessageCrypto(messageCrypto);
return this;
}

@Override
public ReaderBuilder<T> receiverQueueSize(int receiverQueueSize) {
checkArgument(receiverQueueSize >= 0, "receiverQueueSize needs to be >= 0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,15 @@ public void reachedEndOfTopic(Consumer<T> consumer) {
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
}

if (readerConfiguration.getMessageCrypto() != null) {
consumerConfiguration.setMessageCrypto(readerConfiguration.getMessageCrypto());
}

if (readerConfiguration.getKeyHashRanges() != null) {
consumerConfiguration.setKeySharedPolicy(
KeySharedPolicy
.stickyHashRange()
.ranges(readerConfiguration.getKeyHashRanges())
KeySharedPolicy
.stickyHashRange()
.ranges(readerConfiguration.getKeyHashRanges())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
package org.apache.pulsar.client.impl.conf;

import com.fasterxml.jackson.annotation.JsonIgnore;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.ReaderInterceptor;
Expand Down Expand Up @@ -113,6 +117,11 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
)
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

@JsonIgnore
@Setter(onMethod_ = @SuppressFBWarnings({"EI_EXPOSE_REP2"}))
@Getter(onMethod_ = @SuppressFBWarnings({"EI_EXPOSE_REP"}))
private transient MessageCrypto messageCrypto = null;

@ApiModelProperty(
name = "readCompacted",
value = "If enabling `readCompacted`, a consumer reads messages from a compacted topic rather than a full "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,43 @@
*/
package org.apache.pulsar.client.impl;

import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;

public class ReaderImplTest {
private ReaderImpl<byte[]> reader;
private PulsarClientImpl client;
private ExecutorProvider executorProvider;
private ExecutorService internalExecutor;

@BeforeMethod
void setupReader() {
executorProvider = new ExecutorProvider(1, "ReaderImplTest");
internalExecutor = Executors.newSingleThreadScheduledExecutor();
PulsarClientImpl mockedClient = ClientTestFixtures.createPulsarClientMockWithMockedClientCnx(
executorProvider, internalExecutor);
ReaderConfigurationData<byte[]> readerConfiguration = new ReaderConfigurationData<>();
readerConfiguration.setTopicName("topicName");
CompletableFuture<Consumer<byte[]>> consumerFuture = new CompletableFuture<>();
reader = new ReaderImpl<>(mockedClient, readerConfiguration, ClientTestFixtures.createMockedExecutorProvider(),
consumerFuture, Schema.BYTES);
client = ClientTestFixtures.createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
}

@AfterMethod
public void clean() {
public void clean() throws Exception {
if (client != null) {
client.close();
client = null;
}
if (executorProvider != null) {
executorProvider.shutdownNow();
executorProvider = null;
Expand All @@ -65,6 +67,16 @@ public void clean() {

@Test
void shouldSupportCancellingReadNextAsync() {
ReaderConfigurationData<byte[]> readerConfiguration = new ReaderConfigurationData<>();
readerConfiguration.setTopicName("topicName");
CompletableFuture<Consumer<byte[]>> consumerFuture = new CompletableFuture<>();
ReaderImpl<byte[]> reader = new ReaderImpl<>(
client,
readerConfiguration,
ClientTestFixtures.createMockedExecutorProvider(),
consumerFuture,
Schema.BYTES);

// given
CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
Awaitility.await().untilAsserted(() -> {
Expand All @@ -77,4 +89,14 @@ void shouldSupportCancellingReadNextAsync() {
// then
assertFalse(reader.getConsumer().hasNextPendingReceive());
}

@Test
public void testReaderBuilderWhenMessageCryptoSet() throws PulsarClientException {
ReaderBuilderImpl<byte[]> builder = new ReaderBuilderImpl<>(client, Schema.BYTES);
builder.topic("testTopicName");
builder.startMessageFromRollbackDuration(1, TimeUnit.SECONDS);
builder.messageCrypto(new MessageCryptoBc("ctx1", true));
assertNotNull(builder.create());
assertNotNull(builder.getConf().getMessageCrypto());
}
}

0 comments on commit e360379

Please sign in to comment.