-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve] [pip] PIP-290 Provide a way to implement WSS E2E encryption and not need to expose the private key to the WebSocket Proxy #20923
Merged
Merged
Changes from 31 commits
Commits
Show all changes
51 commits
Select commit
Hold shift + click to select a range
aa494cc
pip-290
poorbarcode 69612d0
pip-290
poorbarcode e77765a
-
poorbarcode a794ad2
-
poorbarcode 991e245
Update pip-290.md
poorbarcode fac563b
Update pip-290.md
poorbarcode 6c9aa31
Update pip-290.md
poorbarcode 623bfad
Update pip-290.md
poorbarcode 79e9b37
Update pip-290.md
poorbarcode 974f3cb
Update pip-290.md
poorbarcode 924efa4
Update pip-290.md
poorbarcode 9b74bb0
Update pip-290.md
poorbarcode 4ea09c2
Update pip-290.md
poorbarcode f9f1ef8
Update pip-290.md
poorbarcode 039bd33
Update pip-290.md
poorbarcode 3dfa583
Update pip-290.md
poorbarcode 86b7ac2
Update pip-290.md
poorbarcode dd91830
Update pip-290.md
poorbarcode 79450e3
Update pip-290.md
poorbarcode 5ae87bb
Update pip-290.md
poorbarcode 98b4ae9
Update pip-290.md
poorbarcode ef170c9
Update pip-290.md
poorbarcode e8b6bf5
Update pip-290.md
poorbarcode 69920e8
Update pip-290.md
poorbarcode 400636a
Update pip-290.md
poorbarcode 76a533d
Update pip-290.md
poorbarcode e561207
Update pip-290.md
poorbarcode 7b84e2e
Update pip-290.md
poorbarcode 083204e
Update pip-290.md
poorbarcode 7f2c03c
Update pip-290.md
poorbarcode 2f16a14
Update pip-290.md
poorbarcode bc33169
Update pip-290.md
poorbarcode c203edb
Update pip-290.md
poorbarcode 7827c46
Update pip-290.md
poorbarcode 67307c0
Update pip-290.md
poorbarcode 7725fb6
Update pip-290.md
poorbarcode 0b66149
Update pip-290.md
poorbarcode 30dbc72
Update pip-290.md
poorbarcode 30197ac
Update pip-290.md
poorbarcode 82dea5e
Update pip-290.md
poorbarcode 238cbe2
Update pip-290.md
poorbarcode bc7bc98
Update pip-290.md
poorbarcode 9877aa1
Update pip-290.md
poorbarcode 7a76a11
Update pip-290.md
poorbarcode a4b0630
Update pip-290.md
poorbarcode 99bd93b
Update pip-290.md
poorbarcode 93181ba
Update pip-290.md
poorbarcode 07c6d67
Update pip-290.md
poorbarcode ca630c5
Update pip-290.md
poorbarcode 7488097
Update pip-290.md
poorbarcode 3ebf08e
Update pip-290.md
poorbarcode File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
# Background knowledge | ||
|
||
### 1. Web Socket Proxy Server | ||
[Web Socket Proxy Server](https://pulsar.apache.org/docs/3.0.x/client-libraries-websocket/#run-the-websocket-service) provides a simple way to interact with Pulsar under `WSS` protocol. | ||
- When a [wss-producer](https://pulsar.apache.org/docs/3.0.x/client-libraries-websocket/#nodejs-producer) was registered, Web Socket Proxy Server will create a one-to-one producer to actually send messages to the Broker. | ||
- When a [wss-consumer](https://pulsar.apache.org/docs/3.0.x/client-libraries-websocket/#nodejs-consumer) was registered, Web Socket Proxy Server will create a one-to-one consumer to actually receive messages from the Broker and send them to WSS Consumer. | ||
|
||
### 2. When a user wants to encrypt the message payload, there are two solutions: | ||
- **Solution 1**: encrypt message payload before WSS Producer sends messages, and decrypt after WSS Consumer receives messages. If the user wants to use different encryption keys for different messages, they can set a [property](https://github.com/apache/pulsar/blob/master/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java#L38) into messages to indicate the message was encrypted by which key. But this solution has a shortcoming: if the user also has consumers with Java clients, then these consumers cannot auto-decrypt the messages(Normally, java clients can [decrypt messages automatically](https://pulsar.apache.org/docs/3.0.x/security-encryption/#how-it-works-in-pulsar)). And the benefit of this solution is that the user does not need to expose the private key to Web Socket Proxy Server. | ||
- **Solution 2**: In the release `2.11`, there is a [feature](https://github.com/apache/pulsar/pull/16234) that provides a way to set encrypt keys for the internal producers and consumers of Web Socket Proxy Server, but needs the user to upload both public key and private key into the Web Socket Proxy Server(in other words: user should expose the keys to Web Socket Proxy Server), there is a un-recommended workaround for this shortcoming<sup>[1]</sup>. The benefit is that the WSS producer and WSS consumer should not care about encryption and decryption. | ||
|
||
### 3. The message payload process during message sending | ||
- The Producer will composite several message payloads into a batched message payload if the producer is enabled batch; | ||
- The Producer will compress the batched message payload to a compressed payload if enabled compression; | ||
- After the previous two steps, the Producer encrypts the compressed payload to an encrypted payload. | ||
|
||
|
||
### 4. Encrypt context | ||
|
||
The Construction of the Encrypt Context: | ||
```json | ||
{ | ||
"batchSize": 2, // How many single messages are in the batch. If null, it means it is not a batched message. | ||
"compressionType": "NONE", // the compression type. | ||
"uncompressedMessageSize": 0, // the size of the uncompressed payload. | ||
"keys": { | ||
"client-rsa.pem": { // key name. | ||
"keyValue": "asdvfdw==", // key value. | ||
"metadata": {} // extra props of the key. | ||
} | ||
}, | ||
"param": "Tfu1PxVm6S9D3+Hk" // the IV of current encryption for this message. | ||
} | ||
``` | ||
All the fields of Encrypt Context are used to parse the encrypted message payload. | ||
- `keys` and `param` are used to decrypt the encrypted message payload. | ||
- `compressionType` and `uncompressedMessageSize` are used to uncompress the compressed message payload. | ||
- `batchSize` is used to extract the batched message payload. | ||
|
||
There is another attribute named `encryptionAlgo` used to identify what encrypt algo is using, it is an optional attribute, so there is no such property in Encrypt Context. | ||
|
||
When the internal consumer of the Web Socket Proxy Server receives a message, if the message metadata indicates that the message is encrypted, the consumer will add Encrypt Context into the response for the WSS consumer. | ||
|
||
### 5. Quick explanation of the used components in the section Design: | ||
- `CryptoKeyReader`: an interface that requires users to implement to read public key and private key. | ||
- `MessageCrypto`: a tool interface to encrypt and decrypt the message payload and add and extract encryption information for message metadata. | ||
|
||
# Motivation | ||
|
||
Therefore, there is no way to enable encryption under the WSS protocol and meet the following conditions: | ||
- WSS producer and WSS consumer did encrypt and decrypt themselves and did not share private keys to Web Socket Proxy Server. | ||
- Other clients(such as Java and CPP) can automatically decrypt the messages which WSS producer sent. | ||
|
||
# Goals | ||
Provide a way to make Web Socket Proxy Server just passes encrypt information to the client, the WSS producer and WSS consumer did encrypt and decrypt themselves. | ||
|
||
Since the order of producer operation for message payloads is `compression --> encryption,` users need to handle Compression themselves if needed. | ||
|
||
Since the order of consumer operation for message payload is `deencryption --> un-compression --> extract the batched messages`, users need to handle Un-compression amd Extract Batch Messages themselves if needed. | ||
|
||
Note: I want to cherry-pick this feature into `branch-2.11`. | ||
|
||
|
||
## Out of Scope | ||
This proposal does not intend to support the three features: | ||
- Support publishing "Null value messages" for WSS producers. | ||
- Support publishing "Chunked messages" for WSS producers. | ||
- Support publishing "Batched messages" for WSS producers. | ||
|
||
|
||
# High-Level Design | ||
**For WSS producers**: Web Socket Proxy Server marks the Producer as Client-Side Encryption Producer if a producer registered with a non-empty `encryptionKeyValues`, and discards server-side batch messages, server-side compression, and server-side encryption. | ||
|
||
**For WSS consumers**: Users can set the parameter `cryptoFailureAction` to `CONSUME` to directly receive the undecrypted message payload (it was supported before). | ||
|
||
# Detailed Design | ||
**For the producers marked as Client-Side Encryption Producer**: | ||
|
||
- forcefully set the component `CryptoKeyReader` to `DummyCryptoKeyReaderImpl`. | ||
- `DummyCryptoKeyReaderImpl`: doesn't provide any public key or private key, and just returns `null`. | ||
- forcefully set the component `MessageCrypto` to `WSSDummyMessageCryptoImpl` to skip the message Server-Side encryption. | ||
- `WSSDummyMessageCryptoImpl`: only set the encryption info into the message metadata and discard payload encryption. | ||
- forcefully set `enableBatching` to `false` to skip Server-Side batch messages building, and print a warning log if users set `enableBatching`, `batchingMaxMessages`, `maxPendingMessages`, `batchingMaxPublishDelay`. | ||
- forcefully set the `CompressionType` to `None` to skip the Server-Side compression, and print a warning log if users set `compressionType`. | ||
- forcefully set the param `enableChunking` to `false`(the default value is `false`) to prevent unexpected problems if the default setting is changed in the future. | ||
|
||
**For the client-side encryption consumers**: | ||
|
||
- To avoid too many warning logs: after setting the config `cryptoFailureAction` of the consumer is `CONSUME`, just print an `INFO` level log when receiving an encrypted message if the consumer could not decrypt it(the original log level is `WARN`). | ||
|
||
|
||
### Public API | ||
|
||
#### [Endpoint: producer connect](https://pulsar.apache.org/docs/3.1.x/client-libraries-websocket/#producer-endpoint) | ||
Add query params below: | ||
| param name | description| | ||
| --- | --- | | ||
| `encryptionKeyValues` | Base64 encoded and URL encoded secret key | | ||
| `encryptionKeyMetadata` | Base64 encoded and URL encoded and JSON formatted key-value metadata list of encryption key | | ||
|
||
#### [Endpoint: publish messages](https://pulsar.apache.org/docs/3.1.x/client-libraries-websocket/#publish-a-message) | ||
Add JSON attributes below: | ||
| param name | description| | ||
| --- | --- | | ||
| `compressionType` | Compression type. Do not set it if compression is not performed | | ||
| `uncompressedMessageSize` | The size of the payload before compression. Do not set it if compression is not performed | | ||
| `encryptionParam` | Base64 encoded serialized initialization vector used when the client encrypts | | ||
|
||
### A demo for client-side encryption producer | ||
```java | ||
public void connect(String publicKeyName, byte[] publicKeyValue, List<KeyValue> encryptKeyMetadata) { | ||
// Base64 encode and url encoder the public key value. | ||
String encryptedPublicKeyDataToString = urlEncode(base64Encode(publicKeyValue)); | ||
// To JSON and base64 encode and url encode the encrypt key metadata | ||
String encryptedPublicKeyMetadata = urlEncode(base64Encode(toJSON(encryptKeyMetadata))); | ||
StringBuilder producerUrL = new StringBuilder(protocolAndHostPort) | ||
.append("/ws/v2/producer/persistent/") | ||
.append(topicName) | ||
.append("?") | ||
.append("encryptionKeys=").append(WssClientSideEncryptUtils.urlEncode(keyName)) | ||
.append("&") | ||
.append("encryptionKeyValues=").append(encryptedPublicKeyDataToString) | ||
.append("&") | ||
.append("encryptionKeyMetadata=").append(encryptedPublicKeyMetadata); | ||
WebSocketClient wssClient = new WebSocketClient(); | ||
wssClient.start(); | ||
Session session = wssClient.connect(this, producerUrL, new ClientUpgradeRequest()).get(); | ||
} | ||
|
||
public void sendMessage(byte[] payload, String msgKey) { | ||
// Compression if needed(optional). | ||
CompressionType compressionType = getcompressionType(); | ||
msg.uncompressedMessageSize = payload.length; | ||
byte[] compressedPayload = compress(payload), | ||
// Encrypt if needed. | ||
bytes[] encryptionParam = getEncryptionParam(); | ||
String base64EncodedEncryptionParam = base64Encode(encryptionParam); | ||
bytes[] encryptedPayload = encrypt(compressedPayload, encryptionParam); | ||
// Do send. | ||
ProducerMessage msg = new ProducerMessage(); | ||
msg.key = msgKey; | ||
msg.payload = encryptedPayload; | ||
msg.encryptionParam = base64EncodedEncryptionParam; | ||
msg.compressionType = compressionType; | ||
msg.uncompressedMessageSize = uncompressedMessageSize; | ||
this.session.getRemote().sendString(toJSON(msg)); | ||
} | ||
``` | ||
|
||
### A demo for client-side encryption consumer | ||
|
||
```java | ||
public void connect(String publicKeyName, byte[] publicKeyValue, List<KeyValue> encryptKeyMetadata) { | ||
StringBuilder consumerUri = new StringBuilder(protocolAndHostPort) | ||
.append("/ws/v2/consumer/persistent/") | ||
.append(topicName) | ||
.append("/") | ||
.append(subscriptionName) | ||
.append("?") | ||
.append("subscriptionType=").append(subscriptionType.toString()) | ||
// Set "cryptoFailureAction" to "CONSUME". | ||
.append("&").append("cryptoFailureAction=CONSUME"); | ||
WebSocketClient wssClient = new WebSocketClient(); | ||
wssClient.start(); | ||
Session session = wssClient.connect(this, buildConnectURL(), new ClientUpgradeRequest()).get(); | ||
} | ||
|
||
public byte[] messageReceived(String text) { | ||
ConsumerMessage msg parseJsonToObject(text); | ||
// base64Decode and decrypt message payload. | ||
byte[] decryptedPayload = decrypt(base64Decode(msg.payload)); | ||
//Un-compress is needed. | ||
byte[] unCompressedPayload = unCompressIfNeeded(decryptedPayload); | ||
return unCompressedPayload; | ||
} | ||
``` | ||
|
||
### Test cases | ||
The test cases all run under setting `enabledE2ECompressionAndEncryption` to `true`. | ||
- Pub & Sub with WSS producer and consumer. | ||
- compression & decryption. | ||
- Pub with Java client library and Sub with WSS consumers. | ||
- non-compression & decryption. | ||
- compression & non-decryption. | ||
- compression & decryption. | ||
- compression & decryption & batch send. | ||
- Pub with WSS protocol and Sub with Java client library(verify it can auto decompression, decryption). | ||
- non-compression & decryption. | ||
- compression & non-decryption. | ||
- compression & decryption. | ||
|
||
# Footnotes | ||
**[1]**: A workaround to avoid exposing the private key to Web Socket Proxy Server(should expose the public key to Web Socket Proxy Server). | ||
A quick background: there are three policies when a consumer cannot describe the message payload: | ||
- CONSUME: it responds to the user's original message payload and prints a warning log. | ||
- DISCARD: discard this message. | ||
- FAIL: add this message into `unackMessagesTracker.` How this message is ultimately handled depends on the policy of unacknowledged messages. | ||
|
||
**Workaround** | ||
- Set `cryptoFailureAction` to `CONSUME` for the WSS consumer | ||
- Make the return value `EncryptionKeyInfo` to `null` for the `CryptoKeyReader`. This will make the internal consumer of Web Socket Proxy Server decrypt message payload fail. | ||
|
||
Then the flow of Pub & Sub will be executed like the following: | ||
- Users do not encrypt message payload before the WSS producer sends messages. | ||
- The internal producer of WebSocket does message payload encryption by the [feature: Support encryption in Web Socket Proxy Server](https://github.com/apache/pulsar/pull/16234) | ||
- The decryption of the internal consumer of Web Socket Proxy Server message payload will be failed, and just send original message payload to the users. | ||
- Users decrypt the message payload themself. |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not add the key metadata to the encryptionKeyValues JSON structure? So that it will align with the returned data structure to consumers.
And could you please also provide an example of what is the original data looks like? without base64 and URL encoding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a new mode for the parameter
encryptionKeys
: If a producer registered with a JSON parameterencryptionKeys
, and theencryptionKeys[{key_name}].keyValue
is not empty, Web Socket Proxy Server will mark this Producer as Client-Side Encryption Producer, then discard server-side batch messages, server-side compression, and server-side encryption.Done.