From ca94db5f90a3e686bed03af7689726ce12b1b10c Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Mon, 1 Feb 2021 11:10:21 +0900 Subject: [PATCH] Add default implementation of CryptoKeyReader (#9379) Currently, users must implement the `CryptoKeyReader` interface for end-to-end message encryption in Java. I thought it would be useful to have a default implementation, so I added `DefaultCryptoKeyReader`. How to use: ```java Producer producer = pulsarClient.newProducer() .topic("persistent://public/default/test") .addEncryptionKey("my-app-key1") .defaultCryptoKeyReader("file:///path/to/public.key") .create(); Consumer consumer = pulsarClient.newConsumer() .topic("persistent://public/default/test") .subscriptionName("sub1") .defaultCryptoKeyReader(new HashMap() { { put("my-app-key1", "file:///path/to/private.key"); put("my-app-key2", "data:application/x-pem-file;base64,*****"); } }) .subscribe(); ``` (cherry picked from commit 85b1c7ef8a37e0bc06052f04ecd5da076804e8cb) --- .../api/SimpleProducerConsumerTest.java | 96 +++++++++++++++ .../pulsar/client/api/TopicReaderTest.java | 88 ++++++++++++++ .../pulsar/client/api/ConsumerBuilder.java | 24 ++++ .../pulsar/client/api/ProducerBuilder.java | 24 ++++ .../pulsar/client/api/ReaderBuilder.java | 24 ++++ .../client/impl/ConsumerBuilderImpl.java | 13 ++ .../client/impl/DefaultCryptoKeyReader.java | 107 ++++++++++++++++ .../impl/DefaultCryptoKeyReaderBuilder.java | 77 ++++++++++++ .../client/impl/ProducerBuilderImpl.java | 13 ++ .../pulsar/client/impl/ReaderBuilderImpl.java | 15 +++ ...faultCryptoKeyReaderConfigurationData.java | 114 ++++++++++++++++++ .../client/impl/ConsumerBuilderImplTest.java | 23 ++++ .../impl/DefaultCryptoKeyReaderTest.java | 113 +++++++++++++++++ .../client/impl/ProducerBuilderImplTest.java | 24 ++++ ...tCryptoKeyReaderConfigurationDataTest.java | 73 +++++++++++ .../test/resources/crypto_ecdsa_private.key | 29 +++++ .../test/resources/crypto_ecdsa_public.key | 15 +++ .../src/test/resources/crypto_rsa_private.key | 27 +++++ .../src/test/resources/crypto_rsa_public.key | 9 ++ 19 files changed, 908 insertions(+) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReader.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationData.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderTest.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationDataTest.java create mode 100644 pulsar-client/src/test/resources/crypto_ecdsa_private.key create mode 100644 pulsar-client/src/test/resources/crypto_ecdsa_public.key create mode 100644 pulsar-client/src/test/resources/crypto_rsa_private.key create mode 100644 pulsar-client/src/test/resources/crypto_rsa_public.key diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 214ec315610d7..4db81d3ca26c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -2616,6 +2616,102 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe log.info("-- Exiting {} test --", methodName); } + @Test(groups = "encryption") + public void testDefaultCryptoKeyReader() throws Exception { + final String topic = "persistent://my-property/my-ns/default-crypto-key-reader" + System.currentTimeMillis(); + final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; + final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem"; + final String ecdsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS0tLUVORCBQVUJMSUMgS0VZLS0tLS0K"; + final String ecdsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K"; + final String rsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-rsa.pem"; + final String rsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-rsa.pem"; + final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg=="; + final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg=="; + final int numMsg = 10; + + Map privateKeyFileMap = Maps.newHashMap(); + privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile); + privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile); + Map privateKeyDataMap = Maps.newHashMap(); + privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData); + privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData); + + Consumer consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe(); + Consumer consumer2 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub2") + .defaultCryptoKeyReader(ecdsaPrivateKeyData).subscribe(); + Consumer consumer3 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub3") + .defaultCryptoKeyReader(privateKeyFileMap).subscribe(); + Consumer consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub4") + .defaultCryptoKeyReader(privateKeyDataMap).subscribe(); + + Producer producer1 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(ecdsaPublicKeyFile).create(); + Producer producer2 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(ecdsaPublicKeyData).create(); + + for (int i = 0; i < numMsg; i++) { + producer1.send(("my-message-" + i).getBytes()); + } + for (int i = numMsg; i < numMsg * 2; i++) { + producer2.send(("my-message-" + i).getBytes()); + } + + producer1.close(); + producer2.close(); + + for (Consumer consumer : (List>) Lists.newArrayList(consumer1, consumer2)) { + MessageImpl msg = null; + + for (int i = 0; i < numMsg * 2; i++) { + msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx().orElseThrow( + () -> new IllegalStateException("encryption-ctx not present for encrypted message")); + assertEquals(new String(msg.getData()), "my-message-" + i); + } + + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + } + + consumer1.unsubscribe(); + consumer2.unsubscribe(); + + Producer producer3 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem") + .defaultCryptoKeyReader(rsaPublicKeyFile).create(); + Producer producer4 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem") + .defaultCryptoKeyReader(rsaPublicKeyData).create(); + + for (int i = numMsg * 2; i < numMsg * 3; i++) { + producer3.send(("my-message-" + i).getBytes()); + } + for (int i = numMsg * 3; i < numMsg * 4; i++) { + producer4.send(("my-message-" + i).getBytes()); + } + + producer3.close(); + producer4.close(); + + for (Consumer consumer : (List>) Lists.newArrayList(consumer3, consumer4)) { + MessageImpl msg = null; + + for (int i = 0; i < numMsg * 4; i++) { + msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx().orElseThrow( + () -> new IllegalStateException("encryption-ctx not present for encrypted message")); + assertEquals(new String(msg.getData()), "my-message-" + i); + } + + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + } + + consumer3.unsubscribe(); + consumer4.unsubscribe(); + } + @Test(groups = "encryption") public void testRedeliveryOfFailedMessages() throws Exception { log.info("-- Starting {} test --", methodName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 7a55d2f6ba481..8c8e83ad48b1a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.fail; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.io.IOException; import java.nio.file.Files; @@ -645,6 +646,93 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe reader.close(); } + @Test(groups = "encryption") + public void testDefaultCryptoKeyReader() throws Exception { + final String topic = "persistent://my-property/my-ns/test-reader-default-crypto-key-reader" + + System.currentTimeMillis(); + final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; + final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem"; + final String ecdsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS0tLUVORCBQVUJMSUMgS0VZLS0tLS0K"; + final String ecdsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K"; + final String rsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-rsa.pem"; + final String rsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-rsa.pem"; + final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg=="; + final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg=="; + final int numMsg = 10; + + Map privateKeyFileMap = Maps.newHashMap(); + privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile); + privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile); + Map privateKeyDataMap = Maps.newHashMap(); + privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData); + privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData); + + Reader reader1 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) + .defaultCryptoKeyReader(ecdsaPrivateKeyFile).create(); + Reader reader2 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) + .defaultCryptoKeyReader(ecdsaPrivateKeyData).create(); + Reader reader3 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) + .defaultCryptoKeyReader(privateKeyFileMap).create(); + Reader reader4 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) + .defaultCryptoKeyReader(privateKeyDataMap).create(); + + Producer producer1 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(ecdsaPublicKeyFile).create(); + Producer producer2 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(ecdsaPublicKeyData).create(); + + for (int i = 0; i < numMsg; i++) { + producer1.send(("my-message-" + i).getBytes()); + } + for (int i = numMsg; i < numMsg * 2; i++) { + producer2.send(("my-message-" + i).getBytes()); + } + + producer1.close(); + producer2.close(); + + for (Reader reader : (List>) Lists.newArrayList(reader1, reader2)) { + for (int i = 0; i < numMsg * 2; i++) { + MessageImpl msg = (MessageImpl) reader.readNext(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx().orElseThrow( + () -> new IllegalStateException("encryption-ctx not present for encrypted message")); + assertEquals(new String(msg.getData()), "my-message-" + i); + } + } + + reader1.close(); + reader2.close(); + + Producer producer3 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem") + .defaultCryptoKeyReader(rsaPublicKeyFile).create(); + Producer producer4 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem") + .defaultCryptoKeyReader(rsaPublicKeyData).create(); + + for (int i = numMsg * 2; i < numMsg * 3; i++) { + producer3.send(("my-message-" + i).getBytes()); + } + for (int i = numMsg * 3; i < numMsg * 4; i++) { + producer4.send(("my-message-" + i).getBytes()); + } + + producer3.close(); + producer4.close(); + + for (Reader reader : (List>) Lists.newArrayList(reader3, reader4)) { + for (int i = 0; i < numMsg * 4; i++) { + MessageImpl msg = (MessageImpl) reader.readNext(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx().orElseThrow( + () -> new IllegalStateException("encryption-ctx not present for encrypted message")); + assertEquals(new String(msg.getData()), "my-message-" + i); + } + } + + reader3.close(); + reader4.close(); + } + @Test public void testSimpleReaderReachEndOfTopic() throws Exception { Reader reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic") diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index c6a6555b0b4d2..85a4ca571e8dd 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -270,6 +270,30 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader); + /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + *

Configure the key reader to be used to decrypt the message payloads. + * + * @param privateKey + * the private key that is always used to decrypt message payloads. + * @return the consumer builder instance + * @since 2.8.0 + */ + ConsumerBuilder defaultCryptoKeyReader(String privateKey); + + /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + *

Configure the key reader to be used to decrypt the message payloads. + * + * @param privateKeys + * the map of private key names and their URIs used to decrypt message payloads. + * @return the consumer builder instance + * @since 2.8.0 + */ + ConsumerBuilder defaultCryptoKeyReader(Map privateKeys); + /** * Sets a {@link MessageCrypto}. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java index 8c40343770ccc..62d9b206bdabc 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java @@ -334,6 +334,30 @@ public interface ProducerBuilder extends Cloneable { */ ProducerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader); + /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + *

Configure the key reader to be used to encrypt the message payloads. + * + * @param publicKey + * the public key that is always used to encrypt message payloads. + * @return the producer builder instance + * @since 2.8.0 + */ + ProducerBuilder defaultCryptoKeyReader(String publicKey); + + /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + *

Configure the key reader to be used to encrypt the message payloads. + * + * @param publicKeys + * the map of public key names and their URIs used to encrypt message payloads. + * @return the producer builder instance + * @since 2.8.0 + */ + ProducerBuilder defaultCryptoKeyReader(Map publicKeys); + /** * Add public encryption key, used by producer to encrypt the data key. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java index 506ad86f91761..4adf492a4ac6c 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -169,6 +169,30 @@ public interface ReaderBuilder extends Cloneable { */ ReaderBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader); + /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + *

Configure the key reader to be used to decrypt the message payloads. + * + * @param privateKey + * the private key that is always used to decrypt message payloads. + * @return the reader builder instance + * @since 2.8.0 + */ + ReaderBuilder defaultCryptoKeyReader(String privateKey); + + /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + *

Configure the key reader to be used to decrypt the message payloads. + * + * @param privateKeys + * the map of private key names and their URIs used to decrypt message payloads. + * @return the reader builder instance + * @since 2.8.0 + */ + ReaderBuilder defaultCryptoKeyReader(Map privateKeys); + /** * Sets the {@link ConsumerCryptoFailureAction} to specify. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 8429598c97f7a..629e751e00c57 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.DefaultCryptoKeyReader; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.RetryMessageUtil; @@ -239,6 +240,18 @@ public ConsumerBuilder cryptoKeyReader(@NonNull CryptoKeyReader cryptoKeyRead return this; } + @Override + public ConsumerBuilder defaultCryptoKeyReader(String privateKey) { + checkArgument(StringUtils.isNotBlank(privateKey), "privateKey cannot be blank"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPrivateKey(privateKey).build()); + } + + @Override + public ConsumerBuilder defaultCryptoKeyReader(@NonNull Map privateKeys) { + checkArgument(!privateKeys.isEmpty(), "privateKeys cannot be empty"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().privateKeys(privateKeys).build()); + } + @Override public ConsumerBuilder messageCrypto(@NonNull MessageCrypto messageCrypto) { conf.setMessageCrypto(messageCrypto); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReader.java new file mode 100644 index 0000000000000..4e8f6c3c665e3 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReader.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URLConnection; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.EncryptionKeyInfo; +import org.apache.pulsar.client.api.url.URL; +import org.apache.pulsar.client.impl.conf.DefaultCryptoKeyReaderConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultCryptoKeyReader implements CryptoKeyReader { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultCryptoKeyReader.class); + + private static final String APPLICATION_X_PEM_FILE = "application/x-pem-file"; + + private String defaultPublicKey; + private String defaultPrivateKey; + + private Map publicKeys; + private Map privateKeys; + + public static DefaultCryptoKeyReaderBuilder builder() { + return new DefaultCryptoKeyReaderBuilder(); + } + + DefaultCryptoKeyReader(DefaultCryptoKeyReaderConfigurationData conf) { + this.defaultPublicKey = conf.getDefaultPublicKey(); + this.defaultPrivateKey = conf.getDefaultPrivateKey(); + this.publicKeys = conf.getPublicKeys(); + this.privateKeys = conf.getPrivateKeys(); + } + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map metadata) { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + String publicKey = publicKeys.getOrDefault(keyName, defaultPublicKey); + + if (publicKey == null) { + LOG.warn("Public key named {} is not set", keyName); + } else { + try { + keyInfo.setKey(loadKey(publicKey)); + } catch (Exception e) { + LOG.error("Failed to load public key named {}", keyName, e); + } + } + + return keyInfo; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map metadata) { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + String privateKey = privateKeys.getOrDefault(keyName, defaultPrivateKey); + + if (privateKey == null) { + LOG.warn("Private key named {} is not set", keyName); + } else { + try { + keyInfo.setKey(loadKey(privateKey)); + } catch (Exception e) { + LOG.error("Failed to load private key named {}", keyName, e); + } + } + + return keyInfo; + } + + private byte[] loadKey(String keyUrl) throws IOException, IllegalAccessException, InstantiationException { + try { + URLConnection urlConnection = new URL(keyUrl).openConnection(); + String protocol = urlConnection.getURL().getProtocol(); + if ("data".equals(protocol) && !APPLICATION_X_PEM_FILE.equals(urlConnection.getContentType())) { + throw new IllegalArgumentException( + "Unsupported media type or encoding format: " + urlConnection.getContentType()); + } + return IOUtils.toByteArray(urlConnection); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid key format"); + } + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java new file mode 100644 index 0000000000000..4d98a011b991c --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.pulsar.client.impl.conf.DefaultCryptoKeyReaderConfigurationData; + +public class DefaultCryptoKeyReaderBuilder implements Cloneable { + + private DefaultCryptoKeyReaderConfigurationData conf; + + DefaultCryptoKeyReaderBuilder() { + this(new DefaultCryptoKeyReaderConfigurationData()); + } + + DefaultCryptoKeyReaderBuilder(DefaultCryptoKeyReaderConfigurationData conf) { + this.conf = conf; + } + + public DefaultCryptoKeyReaderBuilder defaultPublicKey(String defaultPublicKey) { + conf.setDefaultPublicKey(defaultPublicKey); + return this; + } + + public DefaultCryptoKeyReaderBuilder defaultPrivateKey(String defaultPrivateKey) { + conf.setDefaultPrivateKey(defaultPrivateKey); + return this; + } + + public DefaultCryptoKeyReaderBuilder publicKey(String keyName, String publicKey) { + conf.setPublicKey(keyName, publicKey); + return this; + } + + public DefaultCryptoKeyReaderBuilder privateKey(String keyName, String privateKey) { + conf.setPrivateKey(keyName, privateKey); + return this; + } + + public DefaultCryptoKeyReaderBuilder publicKeys(Map publicKeys) { + conf.getPublicKeys().putAll(publicKeys); + return this; + } + + public DefaultCryptoKeyReaderBuilder privateKeys(Map privateKeys) { + conf.getPrivateKeys().putAll(privateKeys); + return this; + } + + public DefaultCryptoKeyReader build() { + return new DefaultCryptoKeyReader(conf); + } + + @Override + public DefaultCryptoKeyReaderBuilder clone() { + return new DefaultCryptoKeyReaderBuilder(conf.clone()); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java index 64a22d7b6d2d1..2ae20c480beb8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.apache.pulsar.client.api.interceptor.ProducerInterceptorWrapper; +import org.apache.pulsar.client.impl.DefaultCryptoKeyReader; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.util.FutureUtil; @@ -201,6 +202,18 @@ public ProducerBuilder cryptoKeyReader(@NonNull CryptoKeyReader cryptoKeyRead return this; } + @Override + public ProducerBuilder defaultCryptoKeyReader(String publicKey) { + checkArgument(StringUtils.isNotBlank(publicKey), "publicKey cannot be blank"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPublicKey(publicKey).build()); + } + + @Override + public ProducerBuilder defaultCryptoKeyReader(@NonNull Map publicKeys) { + checkArgument(!publicKeys.isEmpty(), "publicKeys cannot be empty"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().publicKeys(publicKeys).build()); + } + @Override public ProducerBuilder addEncryptionKey(String key) { checkArgument(StringUtils.isNotBlank(key), "Encryption key cannot be blank"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index 1c62a472254e4..e4f02b6b8950a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkArgument; import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -26,6 +27,7 @@ import lombok.AccessLevel; import lombok.Getter; import com.google.common.base.Preconditions; +import lombok.NonNull; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.CryptoKeyReader; @@ -36,6 +38,7 @@ import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.ReaderListener; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.DefaultCryptoKeyReader; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.apache.pulsar.common.util.FutureUtil; @@ -140,6 +143,18 @@ public ReaderBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) { return this; } + @Override + public ReaderBuilder defaultCryptoKeyReader(String privateKey) { + checkArgument(StringUtils.isNotBlank(privateKey), "privateKey cannot be blank"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPrivateKey(privateKey).build()); + } + + @Override + public ReaderBuilder defaultCryptoKeyReader(@NonNull Map privateKeys) { + checkArgument(!privateKeys.isEmpty(), "privateKeys cannot be empty"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().privateKeys(privateKeys).build()); + } + @Override public ReaderBuilder cryptoFailureAction(ConsumerCryptoFailureAction action) { conf.setCryptoFailureAction(action); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationData.java new file mode 100644 index 0000000000000..9db90998d6f62 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationData.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.conf; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class DefaultCryptoKeyReaderConfigurationData implements Serializable, Cloneable { + + private static final long serialVersionUID = 1L; + + private static final String TO_STRING_FORMAT = "%s(defaultPublicKey=%s, defaultPrivateKey=%s, publicKeys=%s, privateKeys=%s)"; + + @NonNull + private String defaultPublicKey; + @NonNull + private String defaultPrivateKey; + + @NonNull + private Map publicKeys = new HashMap<>(); + @NonNull + private Map privateKeys = new HashMap<>(); + + public void setPublicKey(@NonNull String keyName, @NonNull String publicKey) { + publicKeys.put(keyName, publicKey); + } + + public void setPrivateKey(@NonNull String keyName, @NonNull String privateKey) { + privateKeys.put(keyName, privateKey); + } + + @Override + public DefaultCryptoKeyReaderConfigurationData clone() { + DefaultCryptoKeyReaderConfigurationData clone = new DefaultCryptoKeyReaderConfigurationData(); + + if (defaultPublicKey != null) { + clone.setDefaultPublicKey(defaultPublicKey); + } + + if (defaultPrivateKey != null) { + clone.setDefaultPrivateKey(defaultPrivateKey); + } + + if (publicKeys != null) { + clone.setPublicKeys(new HashMap(publicKeys)); + } + + if (privateKeys != null) { + clone.setPrivateKeys(new HashMap(privateKeys)); + } + + return clone; + } + + @Override + public String toString() { + return String.format(TO_STRING_FORMAT, getClass().getSimpleName(), maskKeyData(defaultPublicKey), + maskKeyData(defaultPrivateKey), maskKeyData(publicKeys), maskKeyData(privateKeys)); + } + + private static String maskKeyData(Map keys) { + if (keys == null) { + return "null"; + } else { + StringBuilder keysStr = new StringBuilder(); + keysStr.append("{"); + + List kvList = new ArrayList<>(); + keys.forEach((k, v) -> kvList.add(k + "=" + maskKeyData(v))); + keysStr.append(String.join(", ", kvList)); + + keysStr.append("}"); + return keysStr.toString(); + } + } + + private static String maskKeyData(String key) { + if (key == null) { + return "null"; + } else if (key.startsWith("data:")) { + return "data:*****"; + } else { + return key; + } + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index 02d77a74140ad..b2ce170b124c5 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -121,6 +121,29 @@ public void testConsumerBuilderImplWhenCryptoKeyReaderIsNull() { .cryptoKeyReader(null); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsNullString() { + consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName") + .defaultCryptoKeyReader((String) null); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyString() { + consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName").defaultCryptoKeyReader(""); + } + + @Test(expectedExceptions = NullPointerException.class) + public void testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsNullMap() { + consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName") + .defaultCryptoKeyReader((Map) null); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyMap() { + consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName") + .defaultCryptoKeyReader(new HashMap()); + } + @Test(expectedExceptions = NullPointerException.class) public void testConsumerBuilderImplWhenCryptoFailureActionIsNull() { consumerBuilderImpl.topic(TOPIC_NAME) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderTest.java new file mode 100644 index 0000000000000..f1e4ccecb262e --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +import org.testng.annotations.Test; + +public class DefaultCryptoKeyReaderTest { + + @Test + public void testBuild() throws Exception { + Map publicKeys = new HashMap<>(); + publicKeys.put("key1", "file:///path/to/public1.key"); + publicKeys.put("key2", "file:///path/to/public2.key"); + + Map privateKeys = new HashMap<>(); + privateKeys.put("key3", "file:///path/to/private3.key"); + + DefaultCryptoKeyReader keyReader = DefaultCryptoKeyReader.builder() + .defaultPublicKey("file:///path/to/default-public.key") + .defaultPrivateKey("file:///path/to/default-private.key") + .publicKey("key4", "file:///path/to/public4.key").publicKeys(publicKeys) + .publicKey("key5", "file:///path/to/public5.key").privateKey("key6", "file:///path/to/private6.key") + .privateKeys(privateKeys).privateKey("key7", "file:///path/to/private7.key").build(); + + Field defaultPublicKeyField = keyReader.getClass().getDeclaredField("defaultPublicKey"); + defaultPublicKeyField.setAccessible(true); + Field defaultPrivateKeyField = keyReader.getClass().getDeclaredField("defaultPrivateKey"); + defaultPrivateKeyField.setAccessible(true); + Field publicKeysField = keyReader.getClass().getDeclaredField("publicKeys"); + publicKeysField.setAccessible(true); + Field privateKeysField = keyReader.getClass().getDeclaredField("privateKeys"); + privateKeysField.setAccessible(true); + + Map expectedPublicKeys = new HashMap<>(); + expectedPublicKeys.put("key1", "file:///path/to/public1.key"); + expectedPublicKeys.put("key2", "file:///path/to/public2.key"); + expectedPublicKeys.put("key4", "file:///path/to/public4.key"); + expectedPublicKeys.put("key5", "file:///path/to/public5.key"); + + Map expectedPrivateKeys = new HashMap<>(); + expectedPrivateKeys.put("key3", "file:///path/to/private3.key"); + expectedPrivateKeys.put("key6", "file:///path/to/private6.key"); + expectedPrivateKeys.put("key7", "file:///path/to/private7.key"); + + assertEquals((String) defaultPublicKeyField.get(keyReader), "file:///path/to/default-public.key"); + assertEquals((String) defaultPrivateKeyField.get(keyReader), "file:///path/to/default-private.key"); + assertEquals((Map) publicKeysField.get(keyReader), expectedPublicKeys); + assertEquals((Map) privateKeysField.get(keyReader), expectedPrivateKeys); + } + + @Test + public void testGetKeys() throws Exception { + final String ecdsaPublicKey = "./src/test/resources/crypto_ecdsa_public.key"; + final String ecdsaPrivateKey = "./src/test/resources/crypto_ecdsa_private.key"; + final String rsaPublicKey = "./src/test/resources/crypto_rsa_public.key"; + final String rsaPrivateKey = "./src/test/resources/crypto_rsa_private.key"; + + DefaultCryptoKeyReader keyReader1 = DefaultCryptoKeyReader.builder().build(); + assertNull(keyReader1.getPublicKey("key0", null).getKey()); + assertNull(keyReader1.getPrivateKey("key0", null).getKey()); + + DefaultCryptoKeyReader keyReader2 = DefaultCryptoKeyReader.builder().defaultPublicKey("file:" + ecdsaPublicKey) + .defaultPrivateKey("file:" + ecdsaPrivateKey) + .publicKey("key1", + "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF6elRUenNTc1pGWWxXeWJack1OdwphRGpncWluSU5vNXlOa0h1UkJQZzJyNTZCRWFIb1U1eStjY0RoeXhCR0NLUFprVGNRYXN2WWdXSjNzSFJLQWxOCmRaTkc4R3QzazJTcmZEcnJ0ajFLTDNHNk5XUkE4VHF5Umt4eGw1dnBBTWM2OVVqWDlIUHdTemxtckM3WlhtMWUKU3dZVFY3Kzdxcy82OUpMQm5yTUpjc2wrSXlYVWFoaFJuOHcyRmtzOUpXcmlOS2kxUFNnQ1BqTWpnS0JGN3lhRQpBVEowR01TTWM4RnZYV3dGSnNXQldRa1V3Z3FsRXhSMU1EaVZWQnR3OVF0SkIyOUlOaTBORHMyUGViNjFEdDQ5Ck5abE4va2xKQ1hJVXRCU0lxZzlvK2lSS1Z3WExIbklNMFdIVm5tUm4yTUswbmYwMy9Ed0NJVm5iNWVsVG9aNzIKOHdJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==") // crypto_rsa_public.key + .privateKey("key1", + "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFcEFJQkFBS0NBUUVBenpUVHpzU3NaRllsV3liWnJNTndhRGpncWluSU5vNXlOa0h1UkJQZzJyNTZCRWFICm9VNXkrY2NEaHl4QkdDS1Baa1RjUWFzdllnV0ozc0hSS0FsTmRaTkc4R3QzazJTcmZEcnJ0ajFLTDNHNk5XUkEKOFRxeVJreHhsNXZwQU1jNjlValg5SFB3U3psbXJDN1pYbTFlU3dZVFY3Kzdxcy82OUpMQm5yTUpjc2wrSXlYVQphaGhSbjh3MkZrczlKV3JpTktpMVBTZ0NQak1qZ0tCRjd5YUVBVEowR01TTWM4RnZYV3dGSnNXQldRa1V3Z3FsCkV4UjFNRGlWVkJ0dzlRdEpCMjlJTmkwTkRzMlBlYjYxRHQ0OU5abE4va2xKQ1hJVXRCU0lxZzlvK2lSS1Z3WEwKSG5JTTBXSFZubVJuMk1LMG5mMDMvRHdDSVZuYjVlbFRvWjcyOHdJREFRQUJBb0lCQUhXRnZmaVJua0dPaHNPTApabnpSb01qTU1janh4OGdCeFErM0YxL3ZjbUkvRk0rbC9UbGxXRnNKSUp3allveEExZHFvaGRDTk9tTzdSblpjCnNiZW1oeE4veEFXS3ZwaVB5Wis5ZjRHdWc0d2pVZjBFYnIwamtJZkV4Y3k2dGs0bHNlLzdMOWxMaE9mMWw2RmoKTlJDVXNaMlZ4WlRJZjdXakh2Qm02SUNOaFhkZmdjL1RPWC9INEJCTXh5UWtrbXZTN3lRSFBtbmVrVnBDandYaQpSZ2RQT3BCU0hVQXN1TGMzY2RPN0R6U2xYQnkrUjNVQjViQzk3ZWZTVHd4bU1kY0dVTlFoMTdDdXcrb3UyT0xKCmwvV3lNQkpnS1AwenA4TUkyWUNQMHRvRTFWVjBGV2lzaU5VZHl3Mm1tZHNLQlBDdFpXNEpmL2F2UkxqQ3B5ODMKZ3llSGk0a0NnWUVBN2ZhYzh1L1dvVWNSeGZ3YmV4eFZOSWI1anBWZ1EyMlhYVXZjT0JBMzE0NUhGSDRrRDlRcwpPbE9NNDhpRVgxR3ErRk9iK1RrVmEzeWVFYnlFSFZtTnhtN1pxREVsR2xQbkhIZ1dKZlZvNGx0ZW1rTlE4Y1FJCkNpRGhVSDdEOWlHZDRUckxxK3U4Slkvb3kwZHBKeWFKL0dzTlB3alZ6TWlBOWtEdUkyS0tScGNDZ1lFQTN1bHAKc1p5ODJFaWJUK1lZTnNoZkF5elBjUWMrQ1ZkY3BTM3lFVU1jTXJyR1QySUhiQWhsZHpBV3ZuaTdQRjFDdllvQgplb1BTR2lFR01Bd0FmYVVJNHBzWnZBVFpUZitHV0tpemxIODIwbHc0dFkyTlcydFlGd0RjWjZFUEtkcTlaQ096CkxmeXcyTmhMcWkyRnBGeUFwY1hsQTkyVVVJMEZjWENDdEFLSjJnVUNnWUVBc0k1bWVyVktpTlRETWlOUWZISlUKSWFuM3BUdmRkWW50WVhKMGpVQXpQb0s0NkZLRERSOStSVFJTZDNzQ0Evc0RJRVpnbG5RdEdWZ1hxODgwTXRhTQpJMnVCb0pIK0ZsK2tQUEk0ZEtkMXoyUzlkelYwN0R4blBxU1FwL20yQ1h0OXVXdTNTL0tXNFVPNkZJRUNXdUwwClJFMWxRWnliak5wREhQS2wvYWtTTVRjQ2dZQnFDODBXakNSakdKZWF1VEpIemFjMTBYbVdvZ1ZuV0VKZzZxekEKZlpiS280UjRlNEJnYXRZcWo1d2lYVGxtREZBVjc3T29YMUh5MEVjclVHcGpXOElRWEEwd0gzWnAzdWhCQVhEOQpjay9ZWDdzeTAvYXR5VEdOTUFHcTR6cGRoUXlZdVVzaTA1WW1jeS83ODlBaVUwZDRsZDdQcWZoSEllKzIrZm1VClBhanJLUUtCZ1FDanBqMFkrRS9IaTBTSlVLTlZQbDN1K1NOMWxMY1IxL2dNakpic1lGR3VhMU1zTFhCTlhUU2wKUWlZSGlhZFQ3QmhQRWtGZFc3dStiRndzMmFkbVcxOUJvVWIrd2d0WlQvdDduVHlvUzRMYWc0dnlhek5QWnpkUQp4NlhQcndaaW1kMFhERGl0R0xqY0xmOTkxRUZzWFNxUHpuRERmWHRKMzErb1U2T2JuSFNJdEE9PQotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQo=") // crypto_rsa_private.key + .publicKey("key2", "file:invalid").privateKey("key2", "file:invalid").publicKey("key3", "data:invalid") + .privateKey("key3", "data:invalid").build(); + + assertNotNull(keyReader2.getPublicKey("key0", null).getKey()); + assertEquals(keyReader2.getPublicKey("key0", null).getKey(), Files.readAllBytes(Paths.get(ecdsaPublicKey))); + assertNotNull(keyReader2.getPrivateKey("key0", null).getKey()); + assertEquals(keyReader2.getPrivateKey("key0", null).getKey(), Files.readAllBytes(Paths.get(ecdsaPrivateKey))); + + assertNotNull(keyReader2.getPublicKey("key1", null).getKey()); + assertEquals(keyReader2.getPublicKey("key1", null).getKey(), Files.readAllBytes(Paths.get(rsaPublicKey))); + assertNotNull(keyReader2.getPrivateKey("key1", null).getKey()); + assertEquals(keyReader2.getPrivateKey("key1", null).getKey(), Files.readAllBytes(Paths.get(rsaPrivateKey))); + + assertNull(keyReader1.getPublicKey("key2", null).getKey()); + assertNull(keyReader1.getPrivateKey("key2", null).getKey()); + assertNull(keyReader1.getPublicKey("key3", null).getKey()); + assertNull(keyReader1.getPrivateKey("key3", null).getKey()); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java index 1031f029cd9f9..b3cda2e3deb06 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java @@ -194,6 +194,30 @@ public void testProducerBuilderImplWhenMaxPendingMessagesIsNegative() throws Pul .create(); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testProducerBuilderImplWhenDefaultCryptoKeyReaderIsNullString() throws PulsarClientException { + producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES); + producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader((String) null).create(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testProducerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyString() throws PulsarClientException { + producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES); + producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader("").create(); + } + + @Test(expectedExceptions = NullPointerException.class) + public void testProducerBuilderImplWhenDefaultCryptoKeyReaderIsNullMap() throws PulsarClientException { + producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES); + producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader((Map) null).create(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testProducerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyMap() throws PulsarClientException { + producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES); + producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader(new HashMap()).create(); + } + @Test(expectedExceptions = IllegalArgumentException.class) public void testProducerBuilderImplWhenEncryptionKeyIsNull() throws PulsarClientException { producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationDataTest.java new file mode 100644 index 0000000000000..cb38ea539d7b7 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationDataTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.conf; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import org.testng.annotations.Test; + +public class DefaultCryptoKeyReaderConfigurationDataTest { + + @Test + public void testClone() throws Exception { + DefaultCryptoKeyReaderConfigurationData conf = new DefaultCryptoKeyReaderConfigurationData(); + conf.setDefaultPublicKey("file:///path/to/default-public.key"); + conf.setDefaultPrivateKey("file:///path/to/default-private.key"); + conf.setPublicKey("key1", "file:///path/to/public1.key"); + conf.setPrivateKey("key2", "file:///path/to/private2.key"); + DefaultCryptoKeyReaderConfigurationData clone = conf.clone(); + + conf.setDefaultPublicKey("data:AAAAA"); + conf.setDefaultPrivateKey("data:BBBBB"); + conf.setPublicKey("key1", "data:CCCCC"); + conf.setPrivateKey("key2", "data:DDDDD"); + + assertEquals(clone.getDefaultPublicKey(), "file:///path/to/default-public.key"); + assertEquals(clone.getDefaultPrivateKey(), "file:///path/to/default-private.key"); + assertEquals(clone.getPublicKeys().get("key1"), "file:///path/to/public1.key"); + assertEquals(clone.getPrivateKeys().get("key2"), "file:///path/to/private2.key"); + } + + @Test + public void testToString() throws Exception { + DefaultCryptoKeyReaderConfigurationData conf = new DefaultCryptoKeyReaderConfigurationData(); + assertEquals(conf.toString(), + "DefaultCryptoKeyReaderConfigurationData(defaultPublicKey=null, defaultPrivateKey=null, publicKeys={}, privateKeys={})"); + + conf.setDefaultPublicKey("file:///path/to/default-public.key"); + conf.setDefaultPrivateKey("data:AAAAA"); + conf.setPublicKey("key1", "file:///path/to/public.key"); + conf.setPrivateKey("key2", "file:///path/to/private.key"); + assertEquals(conf.toString(), + "DefaultCryptoKeyReaderConfigurationData(defaultPublicKey=file:///path/to/default-public.key, defaultPrivateKey=data:*****, publicKeys={key1=file:///path/to/public.key}, privateKeys={key2=file:///path/to/private.key})"); + + conf.setPublicKey("key3", "data:BBBBB"); + conf.setPrivateKey("key4", "data:CCCCC"); + assertTrue(conf.toString().startsWith( + "DefaultCryptoKeyReaderConfigurationData(defaultPublicKey=file:///path/to/default-public.key, defaultPrivateKey=data:*****, publicKeys={")); + assertTrue(conf.toString().contains("key3=data:*****")); + assertFalse(conf.toString().contains("key3=data:BBBBB")); + assertTrue(conf.toString().contains("key4=data:*****")); + assertFalse(conf.toString().contains("key4=data:CCCCC")); + assertTrue(conf.toString().endsWith("})")); + } + +} diff --git a/pulsar-client/src/test/resources/crypto_ecdsa_private.key b/pulsar-client/src/test/resources/crypto_ecdsa_private.key new file mode 100644 index 0000000000000..036a1e7a029b3 --- /dev/null +++ b/pulsar-client/src/test/resources/crypto_ecdsa_private.key @@ -0,0 +1,29 @@ +-----BEGIN EC PARAMETERS----- +MIIBwgIBATBNBgcqhkjOPQEBAkIB//////////////////////////////////// +//////////////////////////////////////////////////8wgZ4EQgH///// +//////////////////////////////////////////////////////////////// +/////////////////ARBUZU+uWGOHJofkpohoLaFQO6i2nJbmbMV87i0iZGO8Qnh +Vhk5Uex+k3sWUsC9O7G/BzVz34g9LDTx70Uf1GtQPwADFQDQnogAKRy4U5bMZxc5 +MoSqoNpkugSBhQQAxoWOBrcEBOnNnj7LZiOVtEKcZIE5BT+1Ifgor2BrTT26oUte +d+/nWSj+HcEnov+o3jNIs8GFakKb+X5+McLlvWYBGDkpaniaO8AEXIpftCx9G9mY +9URJV5tEaBevvRcnPmYsl+5ymV70JkDFULkBP60HYTU8cIaicsJAiL6Udp/RZlAC +QgH///////////////////////////////////////////pRhoeDvy+Wa3/MAUj3 +CaXQO7XJuImcR667b7cekThkCQIBAQ== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MIICnQIBAQRCAPvNzZPrKUZao8oLRet7MUTfa9TUEJYw2C4TyMSP54YWzyoQsGJJ +FQVGsJJWBvmEDTfU5zU6d+vvbLVWy7R0aMDXoIIBxjCCAcICAQEwTQYHKoZIzj0B +AQJCAf////////////////////////////////////////////////////////// +////////////////////////////MIGeBEIB//////////////////////////// +//////////////////////////////////////////////////////////wEQVGV +PrlhjhyaH5KaIaC2hUDuotpyW5mzFfO4tImRjvEJ4VYZOVHsfpN7FlLAvTuxvwc1 +c9+IPSw08e9FH9RrUD8AAxUA0J6IACkcuFOWzGcXOTKEqqDaZLoEgYUEAMaFjga3 +BATpzZ4+y2YjlbRCnGSBOQU/tSH4KK9ga009uqFLXnfv51ko/h3BJ6L/qN4zSLPB +hWpCm/l+fjHC5b1mARg5KWp4mjvABFyKX7QsfRvZmPVESVebRGgXr70XJz5mLJfu +cple9CZAxVC5AT+tB2E1PHCGonLCQIi+lHaf0WZQAkIB//////////////////// +///////////////////////6UYaHg78vlmt/zAFI9wml0Du1ybiJnEeuu2+3HpE4 +ZAkCAQGhgYkDgYYABAFqUEjls03bQowJQUSnTiqzTvdXE26NN891SHoDx2wqfRg2 +OSjzI1bQcBT2GARfut0/tVDbx3gX8qmaPEY9un3dugHSEgX209yLhHuXfIJKZm6Y +indL6RxwL6Wdqv9bsHUi7dSXTBIc60C0a6gxzhX0I9P4g+Og8hrsW0Hmmm1zJQ8x +gQ== +-----END EC PRIVATE KEY----- diff --git a/pulsar-client/src/test/resources/crypto_ecdsa_public.key b/pulsar-client/src/test/resources/crypto_ecdsa_public.key new file mode 100644 index 0000000000000..941afeb112121 --- /dev/null +++ b/pulsar-client/src/test/resources/crypto_ecdsa_public.key @@ -0,0 +1,15 @@ +-----BEGIN PUBLIC KEY----- +MIICXDCCAc8GByqGSM49AgEwggHCAgEBME0GByqGSM49AQECQgH///////////// +//////////////////////////////////////////////////////////////// +/////////zCBngRCAf////////////////////////////////////////////// +///////////////////////////////////////8BEFRlT65YY4cmh+SmiGgtoVA +7qLacluZsxXzuLSJkY7xCeFWGTlR7H6TexZSwL07sb8HNXPfiD0sNPHvRR/Ua1A/ +AAMVANCeiAApHLhTlsxnFzkyhKqg2mS6BIGFBADGhY4GtwQE6c2ePstmI5W0Qpxk +gTkFP7Uh+CivYGtNPbqhS1537+dZKP4dwSei/6jeM0izwYVqQpv5fn4xwuW9ZgEY +OSlqeJo7wARcil+0LH0b2Zj1RElXm0RoF6+9Fyc+ZiyX7nKZXvQmQMVQuQE/rQdh +NTxwhqJywkCIvpR2n9FmUAJCAf////////////////////////////////////// +////+lGGh4O/L5Zrf8wBSPcJpdA7tcm4iZxHrrtvtx6ROGQJAgEBA4GGAAQBalBI +5bNN20KMCUFEp04qs073VxNujTfPdUh6A8dsKn0YNjko8yNW0HAU9hgEX7rdP7VQ +28d4F/KpmjxGPbp93boB0hIF9tPci4R7l3yCSmZumIp3S+kccC+lnar/W7B1Iu3U +l0wSHOtAtGuoMc4V9CPT+IPjoPIa7FtB5pptcyUPMYE= +-----END PUBLIC KEY----- diff --git a/pulsar-client/src/test/resources/crypto_rsa_private.key b/pulsar-client/src/test/resources/crypto_rsa_private.key new file mode 100644 index 0000000000000..5177bb92b69f5 --- /dev/null +++ b/pulsar-client/src/test/resources/crypto_rsa_private.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAzzTTzsSsZFYlWybZrMNwaDjgqinINo5yNkHuRBPg2r56BEaH +oU5y+ccDhyxBGCKPZkTcQasvYgWJ3sHRKAlNdZNG8Gt3k2SrfDrrtj1KL3G6NWRA +8TqyRkxxl5vpAMc69UjX9HPwSzlmrC7ZXm1eSwYTV7+7qs/69JLBnrMJcsl+IyXU +ahhRn8w2Fks9JWriNKi1PSgCPjMjgKBF7yaEATJ0GMSMc8FvXWwFJsWBWQkUwgql +ExR1MDiVVBtw9QtJB29INi0NDs2Peb61Dt49NZlN/klJCXIUtBSIqg9o+iRKVwXL +HnIM0WHVnmRn2MK0nf03/DwCIVnb5elToZ728wIDAQABAoIBAHWFvfiRnkGOhsOL +ZnzRoMjMMcjxx8gBxQ+3F1/vcmI/FM+l/TllWFsJIJwjYoxA1dqohdCNOmO7RnZc +sbemhxN/xAWKvpiPyZ+9f4Gug4wjUf0Ebr0jkIfExcy6tk4lse/7L9lLhOf1l6Fj +NRCUsZ2VxZTIf7WjHvBm6ICNhXdfgc/TOX/H4BBMxyQkkmvS7yQHPmnekVpCjwXi +RgdPOpBSHUAsuLc3cdO7DzSlXBy+R3UB5bC97efSTwxmMdcGUNQh17Cuw+ou2OLJ +l/WyMBJgKP0zp8MI2YCP0toE1VV0FWisiNUdyw2mmdsKBPCtZW4Jf/avRLjCpy83 +gyeHi4kCgYEA7fac8u/WoUcRxfwbexxVNIb5jpVgQ22XXUvcOBA3145HFH4kD9Qs +OlOM48iEX1Gq+FOb+TkVa3yeEbyEHVmNxm7ZqDElGlPnHHgWJfVo4ltemkNQ8cQI +CiDhUH7D9iGd4TrLq+u8JY/oy0dpJyaJ/GsNPwjVzMiA9kDuI2KKRpcCgYEA3ulp +sZy82EibT+YYNshfAyzPcQc+CVdcpS3yEUMcMrrGT2IHbAhldzAWvni7PF1CvYoB +eoPSGiEGMAwAfaUI4psZvATZTf+GWKizlH820lw4tY2NW2tYFwDcZ6EPKdq9ZCOz +Lfyw2NhLqi2FpFyApcXlA92UUI0FcXCCtAKJ2gUCgYEAsI5merVKiNTDMiNQfHJU +Ian3pTvddYntYXJ0jUAzPoK46FKDDR9+RTRSd3sCA/sDIEZglnQtGVgXq880MtaM +I2uBoJH+Fl+kPPI4dKd1z2S9dzV07DxnPqSQp/m2CXt9uWu3S/KW4UO6FIECWuL0 +RE1lQZybjNpDHPKl/akSMTcCgYBqC80WjCRjGJeauTJHzac10XmWogVnWEJg6qzA +fZbKo4R4e4BgatYqj5wiXTlmDFAV77OoX1Hy0EcrUGpjW8IQXA0wH3Zp3uhBAXD9 +ck/YX7sy0/atyTGNMAGq4zpdhQyYuUsi05Ymcy/789AiU0d4ld7PqfhHIe+2+fmU +PajrKQKBgQCjpj0Y+E/Hi0SJUKNVPl3u+SN1lLcR1/gMjJbsYFGua1MsLXBNXTSl +QiYHiadT7BhPEkFdW7u+bFws2admW19BoUb+wgtZT/t7nTyoS4Lag4vyazNPZzdQ +x6XPrwZimd0XDDitGLjcLf991EFsXSqPznDDfXtJ31+oU6ObnHSItA== +-----END RSA PRIVATE KEY----- diff --git a/pulsar-client/src/test/resources/crypto_rsa_public.key b/pulsar-client/src/test/resources/crypto_rsa_public.key new file mode 100644 index 0000000000000..698280474e4e3 --- /dev/null +++ b/pulsar-client/src/test/resources/crypto_rsa_public.key @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzzTTzsSsZFYlWybZrMNw +aDjgqinINo5yNkHuRBPg2r56BEaHoU5y+ccDhyxBGCKPZkTcQasvYgWJ3sHRKAlN +dZNG8Gt3k2SrfDrrtj1KL3G6NWRA8TqyRkxxl5vpAMc69UjX9HPwSzlmrC7ZXm1e +SwYTV7+7qs/69JLBnrMJcsl+IyXUahhRn8w2Fks9JWriNKi1PSgCPjMjgKBF7yaE +ATJ0GMSMc8FvXWwFJsWBWQkUwgqlExR1MDiVVBtw9QtJB29INi0NDs2Peb61Dt49 +NZlN/klJCXIUtBSIqg9o+iRKVwXLHnIM0WHVnmRn2MK0nf03/DwCIVnb5elToZ72 +8wIDAQAB +-----END PUBLIC KEY-----