Skip to content

Commit

Permalink
Add default implementation of CryptoKeyReader (#9379)
Browse files Browse the repository at this point in the history
Currently, users must implement the `CryptoKeyReader` interface for end-to-end message encryption in Java. I thought it would be useful to have a default implementation, so I added `DefaultCryptoKeyReader`.

How to use:
```java
Producer<byte[]> producer = pulsarClient.newProducer()
        .topic("persistent://public/default/test")
        .addEncryptionKey("my-app-key1")
        .defaultCryptoKeyReader("file:///path/to/public.key")
        .create();

Consumer<byte[]> consumer = pulsarClient.newConsumer()
        .topic("persistent://public/default/test")
        .subscriptionName("sub1")
        .defaultCryptoKeyReader(new HashMap<String, String>() {
            {
                put("my-app-key1", "file:///path/to/private.key");
                put("my-app-key2", "data:application/x-pem-file;base64,*****");
            }
        })
        .subscribe();
```

(cherry picked from commit 85b1c7e)
  • Loading branch information
Masahiro Sakamoto authored and zymap committed Mar 3, 2021
1 parent f4a3f4a commit ca94db5
Show file tree
Hide file tree
Showing 19 changed files with 908 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2616,6 +2616,102 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
log.info("-- Exiting {} test --", methodName);
}

@Test(groups = "encryption")
public void testDefaultCryptoKeyReader() throws Exception {
final String topic = "persistent://my-property/my-ns/default-crypto-key-reader" + System.currentTimeMillis();
final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
final String ecdsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS0tLUVORCBQVUJMSUMgS0VZLS0tLS0K";
final String ecdsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K";
final String rsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-rsa.pem";
final String rsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-rsa.pem";
final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==";
final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==";
final int numMsg = 10;

Map<String, String> privateKeyFileMap = Maps.newHashMap();
privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile);
privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile);
Map<String, String> privateKeyDataMap = Maps.newHashMap();
privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData);
privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData);

Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub2")
.defaultCryptoKeyReader(ecdsaPrivateKeyData).subscribe();
Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub3")
.defaultCryptoKeyReader(privateKeyFileMap).subscribe();
Consumer<byte[]> consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub4")
.defaultCryptoKeyReader(privateKeyDataMap).subscribe();

Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem")
.defaultCryptoKeyReader(ecdsaPublicKeyFile).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem")
.defaultCryptoKeyReader(ecdsaPublicKeyData).create();

for (int i = 0; i < numMsg; i++) {
producer1.send(("my-message-" + i).getBytes());
}
for (int i = numMsg; i < numMsg * 2; i++) {
producer2.send(("my-message-" + i).getBytes());
}

producer1.close();
producer2.close();

for (Consumer<byte[]> consumer : (List<Consumer<byte[]>>) Lists.newArrayList(consumer1, consumer2)) {
MessageImpl<byte[]> msg = null;

for (int i = 0; i < numMsg * 2; i++) {
msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
// verify that encrypted message contains encryption-context
msg.getEncryptionCtx().orElseThrow(
() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
assertEquals(new String(msg.getData()), "my-message-" + i);
}

// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
}

consumer1.unsubscribe();
consumer2.unsubscribe();

Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem")
.defaultCryptoKeyReader(rsaPublicKeyFile).create();
Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem")
.defaultCryptoKeyReader(rsaPublicKeyData).create();

for (int i = numMsg * 2; i < numMsg * 3; i++) {
producer3.send(("my-message-" + i).getBytes());
}
for (int i = numMsg * 3; i < numMsg * 4; i++) {
producer4.send(("my-message-" + i).getBytes());
}

producer3.close();
producer4.close();

for (Consumer<byte[]> consumer : (List<Consumer<byte[]>>) Lists.newArrayList(consumer3, consumer4)) {
MessageImpl<byte[]> msg = null;

for (int i = 0; i < numMsg * 4; i++) {
msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
// verify that encrypted message contains encryption-context
msg.getEncryptionCtx().orElseThrow(
() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
assertEquals(new String(msg.getData()), "my-message-" + i);
}

// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
}

consumer3.unsubscribe();
consumer4.unsubscribe();
}

@Test(groups = "encryption")
public void testRedeliveryOfFailedMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -645,6 +646,93 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
reader.close();
}

@Test(groups = "encryption")
public void testDefaultCryptoKeyReader() throws Exception {
final String topic = "persistent://my-property/my-ns/test-reader-default-crypto-key-reader"
+ System.currentTimeMillis();
final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
final String ecdsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS0tLUVORCBQVUJMSUMgS0VZLS0tLS0K";
final String ecdsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K";
final String rsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-rsa.pem";
final String rsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-rsa.pem";
final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==";
final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==";
final int numMsg = 10;

Map<String, String> privateKeyFileMap = Maps.newHashMap();
privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile);
privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile);
Map<String, String> privateKeyDataMap = Maps.newHashMap();
privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData);
privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData);

Reader<byte[]> reader1 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
.defaultCryptoKeyReader(ecdsaPrivateKeyFile).create();
Reader<byte[]> reader2 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
.defaultCryptoKeyReader(ecdsaPrivateKeyData).create();
Reader<byte[]> reader3 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
.defaultCryptoKeyReader(privateKeyFileMap).create();
Reader<byte[]> reader4 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
.defaultCryptoKeyReader(privateKeyDataMap).create();

Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem")
.defaultCryptoKeyReader(ecdsaPublicKeyFile).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem")
.defaultCryptoKeyReader(ecdsaPublicKeyData).create();

for (int i = 0; i < numMsg; i++) {
producer1.send(("my-message-" + i).getBytes());
}
for (int i = numMsg; i < numMsg * 2; i++) {
producer2.send(("my-message-" + i).getBytes());
}

producer1.close();
producer2.close();

for (Reader<byte[]> reader : (List<Reader<byte[]>>) Lists.newArrayList(reader1, reader2)) {
for (int i = 0; i < numMsg * 2; i++) {
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) reader.readNext(5, TimeUnit.SECONDS);
// verify that encrypted message contains encryption-context
msg.getEncryptionCtx().orElseThrow(
() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
assertEquals(new String(msg.getData()), "my-message-" + i);
}
}

reader1.close();
reader2.close();

Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem")
.defaultCryptoKeyReader(rsaPublicKeyFile).create();
Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem")
.defaultCryptoKeyReader(rsaPublicKeyData).create();

for (int i = numMsg * 2; i < numMsg * 3; i++) {
producer3.send(("my-message-" + i).getBytes());
}
for (int i = numMsg * 3; i < numMsg * 4; i++) {
producer4.send(("my-message-" + i).getBytes());
}

producer3.close();
producer4.close();

for (Reader<byte[]> reader : (List<Reader<byte[]>>) Lists.newArrayList(reader3, reader4)) {
for (int i = 0; i < numMsg * 4; i++) {
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) reader.readNext(5, TimeUnit.SECONDS);
// verify that encrypted message contains encryption-context
msg.getEncryptionCtx().orElseThrow(
() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
assertEquals(new String(msg.getData()), "my-message-" + i);
}
}

reader3.close();
reader4.close();
}

@Test
public void testSimpleReaderReachEndOfTopic() throws Exception {
Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,30 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);

/**
* Sets the default implementation of {@link CryptoKeyReader}.
*
* <p>Configure the key reader to be used to decrypt the message payloads.
*
* @param privateKey
* the private key that is always used to decrypt message payloads.
* @return the consumer builder instance
* @since 2.8.0
*/
ConsumerBuilder<T> defaultCryptoKeyReader(String privateKey);

/**
* Sets the default implementation of {@link CryptoKeyReader}.
*
* <p>Configure the key reader to be used to decrypt the message payloads.
*
* @param privateKeys
* the map of private key names and their URIs used to decrypt message payloads.
* @return the consumer builder instance
* @since 2.8.0
*/
ConsumerBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);

/**
* Sets a {@link MessageCrypto}.
*
Expand Down
Loading

0 comments on commit ca94db5

Please sign in to comment.