Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[feat][ws] PIP-290 Make WSS support E2E encryption (apache#20958)
Browse files Browse the repository at this point in the history
See PIP: apache#20923

(cherry picked from commit 07eef59)
  • Loading branch information
poorbarcode committed Aug 29, 2023
1 parent 86c4a97 commit aa93a99
Show file tree
Hide file tree
Showing 14 changed files with 1,400 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket.proxy;

import static org.testng.Assert.assertTrue;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;

@Slf4j
@WebSocket(maxTextMessageSize = 64 * 1024)
public class ClientSideEncryptionWssConsumer extends WebSocketAdapter implements Closeable {

private Session session;
private final CryptoKeyReader cryptoKeyReader;
private final String topicName;
private final String subscriptionName;
private final SubscriptionType subscriptionType;
private final String webSocketProxyHost;
private final int webSocketProxyPort;
private WebSocketClient wssClient;
private final MessageCryptoBc msgCrypto;
private final LinkedBlockingQueue<ConsumerMessage> incomingMessages = new LinkedBlockingQueue<>();

public ClientSideEncryptionWssConsumer(String webSocketProxyHost, int webSocketProxyPort, String topicName,
String subscriptionName, SubscriptionType subscriptionType,
CryptoKeyReader cryptoKeyReader) {
this.webSocketProxyHost = webSocketProxyHost;
this.webSocketProxyPort = webSocketProxyPort;
this.topicName = topicName;
this.subscriptionName = subscriptionName;
this.subscriptionType = subscriptionType;
this.msgCrypto = new MessageCryptoBc("[" + topicName + "] [" + subscriptionName + "]", false);
this.cryptoKeyReader = cryptoKeyReader;
}

public void start() throws Exception {
wssClient = new WebSocketClient();
wssClient.start();
session = wssClient.connect(this, buildConnectURL(), new ClientUpgradeRequest()).get();
assertTrue(session.isOpen());
}

private URI buildConnectURL() throws PulsarClientException.CryptoException {
final String protocolAndHostPort = "ws://" + webSocketProxyHost + ":" + webSocketProxyPort;

// Build the URL for producer.
final StringBuilder consumerUri = new StringBuilder(protocolAndHostPort)
.append("/ws/v2/consumer/persistent/")
.append(topicName)
.append("/")
.append(subscriptionName)
.append("?")
.append("subscriptionType=").append(subscriptionType.toString())
.append("&").append("cryptoFailureAction=CONSUME");
return URI.create(consumerUri.toString());
}

public synchronized ConsumerMessage receive(int timeout, TimeUnit unit) throws Exception {
ConsumerMessage msg = incomingMessages.poll(timeout, unit);
return msg;
}

@Override
public void onWebSocketClose(int statusCode, String reason) {
log.info("Connection closed: {} - {}", statusCode, reason);
this.session = null;
}

@Override
public void onWebSocketConnect(Session session) {
log.info("Got connect: {}", session);
this.session = session;
}

@Override
public void onWebSocketError(Throwable cause) {
log.error("Received an error", cause);
}

@Override
public void onWebSocketText(String text) {

try {
ConsumerMessage msg =
ObjectMapperFactory.getThreadLocal().readValue(text, ConsumerMessage.class);
if (msg.messageId == null) {
log.error("Consumer[{}-{}] Could not extract the response payload: {}", topicName, subscriptionName,
text);
return;
}
// Decrypt.
byte[] decryptedPayload = WssClientSideEncryptUtils.decryptMsgPayload(msg.payload, msg.encryptionContext,
cryptoKeyReader, msgCrypto);
// Un-compression if needed.
byte[] unCompressedPayload = WssClientSideEncryptUtils.unCompressionIfNeeded(decryptedPayload,
msg.encryptionContext);
// Extract batch messages if needed.
if (msg.encryptionContext.getBatchSize().isPresent()) {
List<ConsumerMessage> singleMsgs = WssClientSideEncryptUtils.extractBatchMessagesIfNeeded(
unCompressedPayload, msg.encryptionContext);
for (ConsumerMessage singleMsg : singleMsgs) {
incomingMessages.add(singleMsg);
}
} else {
msg.payload = new String(unCompressedPayload, StandardCharsets.UTF_8);
incomingMessages.add(msg);
}
} catch (Exception ex) {
log.error("Consumer[{}-{}] Could not extract the response payload: {}", topicName, subscriptionName, text);
}
}

@Override
public void close() throws IOException {
try {
wssClient.stop();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket.proxy;

import static org.testng.Assert.assertTrue;
import static org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import static org.apache.pulsar.websocket.proxy.WssClientSideEncryptUtils.EncryptedPayloadAndParam;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;

@Slf4j
@WebSocket(maxTextMessageSize = 64 * 1024)
public class ClientSideEncryptionWssProducer extends WebSocketAdapter implements Closeable {

private Session session;
private volatile CompletableFuture<MessageIdData> sendFuture;
private final ScheduledExecutorService executor;
private final CryptoKeyReader cryptoKeyReader;
private final String topicName;
private final String producerName;
private final String webSocketProxyHost;
private final int webSocketProxyPort;
private final String keyName;
private WebSocketClient wssClient;
private final MessageCryptoBc msgCrypto;

public ClientSideEncryptionWssProducer(String webSocketProxyHost, int webSocketProxyPort, String topicName,
String producerName, CryptoKeyReader cryptoKeyReader, String keyName,
ScheduledExecutorService executor) {
this.webSocketProxyHost = webSocketProxyHost;
this.webSocketProxyPort = webSocketProxyPort;
this.topicName = topicName;
this.producerName = producerName;
this.msgCrypto = new MessageCryptoBc("[" + topicName + "] [" + producerName + "]", true);
this.cryptoKeyReader = cryptoKeyReader;
this.keyName = keyName;
this.executor = executor;
}

public void start() throws Exception {
wssClient = new WebSocketClient();
wssClient.start();
session = wssClient.connect(this, buildConnectURL(), new ClientUpgradeRequest()).get();
assertTrue(session.isOpen());
}

private URI buildConnectURL() throws PulsarClientException.CryptoException {
final String protocolAndHostPort = "ws://" + webSocketProxyHost + ":" + webSocketProxyPort;

// Encode encrypted public key data.
final byte[] keyValue = WssClientSideEncryptUtils.calculateEncryptedKeyValue(msgCrypto, cryptoKeyReader,
keyName);
EncryptionKey encryptionKey = new EncryptionKey();
encryptionKey.setKeyValue(keyValue);
encryptionKey.setMetadata(cryptoKeyReader.getPublicKey(keyName, Collections.emptyMap()).getMetadata());
Map<String, EncryptionKey> encryptionKeyMap = new HashMap<>();
encryptionKeyMap.put(keyName, encryptionKey);

final String encryptionKeys =
WssClientSideEncryptUtils.toJSONAndBase64AndUrlEncode(encryptionKeyMap);

// Build the URL for producer.
final StringBuilder producerUrL = new StringBuilder(protocolAndHostPort)
.append("/ws/v2/producer/persistent/")
.append(topicName)
.append("?")
.append("encryptionKeys=").append(encryptionKeys);
return URI.create(producerUrL.toString());
}

public synchronized MessageIdData sendMessage(ProducerMessage msg) throws Exception {
if (sendFuture != null && !sendFuture.isDone() && !sendFuture.isCancelled()) {
throw new IllegalArgumentException("There is a message still in sending.");
}
if (msg.payload == null) {
throw new IllegalArgumentException("Null value message is not supported.");
}
// Compression.
byte[] unCompressedPayload = msg.payload.getBytes(StandardCharsets.UTF_8);
byte[] compressedPayload = WssClientSideEncryptUtils.compressionIfNeeded(msg.compressionType,
unCompressedPayload);
if (msg.compressionType != null && !CompressionType.NONE.equals(msg.compressionType)) {
msg.uncompressedMessageSize = unCompressedPayload.length;
}
// Encrypt.
EncryptedPayloadAndParam encryptedPayloadAndParam = WssClientSideEncryptUtils.encryptPayload(
cryptoKeyReader, msgCrypto, compressedPayload, keyName);
msg.payload = encryptedPayloadAndParam.encryptedPayload;
msg.encryptionParam = encryptedPayloadAndParam.encryptionParam;
// Do send.
sendFuture = new CompletableFuture<>();
String jsonMsg = ObjectMapperFactory.getThreadLocal().writeValueAsString(msg);
this.session.getRemote().sendString(jsonMsg);
// Wait for response.
executor.schedule(() -> {
synchronized (ClientSideEncryptionWssProducer.this) {
if (!sendFuture.isDone() && !sendFuture.isCancelled()) {
sendFuture.completeExceptionally(new TimeoutException("Send timeout"));
}
}
}, 50, TimeUnit.SECONDS);
return sendFuture.get();
}

@Override
public void onWebSocketClose(int statusCode, String reason) {
log.info("Connection closed: {} - {}", statusCode, reason);
this.session = null;
if (!sendFuture.isDone() && !sendFuture.isCancelled()) {
sendFuture.completeExceptionally(new RuntimeException("Connection was closed"));
}
}

@Override
public void onWebSocketConnect(Session session) {
log.info("Got connect: {}", session);
this.session = session;
}

@Override
public void onWebSocketError(Throwable cause) {
log.error("Received an error", cause);
}

@Override
public void onWebSocketText(String text) {
try {
ResponseOfSend responseOfSend =
ObjectMapperFactory.getThreadLocal().readValue(text, ResponseOfSend.class);
if (responseOfSend.getErrorCode() != 0 || responseOfSend.getErrorMsg() != null) {
sendFuture.completeExceptionally(new RuntimeException(text));
} else {
byte[] bytes = Base64.getDecoder().decode(responseOfSend.getMessageId());
MessageIdData messageIdData = new MessageIdData();
messageIdData.parseFrom(bytes);
sendFuture.complete(messageIdData);
}
} catch (Exception ex) {
log.error("Could not extract the response payload: {}", text);
}
}

@Override
public void close() throws IOException {
try {
wssClient.stop();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Data
public static class ResponseOfSend {
private String result;
private String messageId;
private String errorMsg;
private int errorCode = -1;
private int schemaVersion;
}
}
Loading

0 comments on commit aa93a99

Please sign in to comment.