Skip to content

Commit

Permalink
[improve][broker,proxy] Use ChannelVoidPromise to avoid useless promi…
Browse files Browse the repository at this point in the history
…se objects creation (#19141)
  • Loading branch information
nicoloboschi authored Jan 10, 2023
1 parent 9ef54fd commit d6fcdb8
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.netty.NettyChannelUtil;

@Slf4j
public class PulsarCommandSenderImpl implements PulsarCommandSender {
Expand All @@ -55,39 +56,39 @@ public void sendPartitionMetadataResponse(ServerError error, String errorMsg, lo
BaseCommand command = Commands.newPartitionMetadataResponseCommand(error, errorMsg, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
public void sendPartitionMetadataResponse(int partitions, long requestId) {
BaseCommand command = Commands.newPartitionMetadataResponseCommand(partitions, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
public void sendSuccessResponse(long requestId) {
BaseCommand command = Commands.newSuccessCommand(requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
public void sendErrorResponse(long requestId, ServerError error, String message) {
BaseCommand command = Commands.newErrorCommand(requestId, error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
public void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion) {
BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, schemaVersion);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
Expand All @@ -98,7 +99,7 @@ public void sendProducerSuccessResponse(long requestId, String producerName, lon
schemaVersion, topicEpoch, isProducerReady);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
Expand All @@ -108,15 +109,15 @@ public void sendSendReceiptResponse(long producerId, long sequenceId, long highe
entryId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
public void sendSendError(long producerId, long sequenceId, ServerError error, String errorMsg) {
BaseCommand command = Commands.newSendErrorCommand(producerId, sequenceId, error, errorMsg);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
Expand All @@ -126,31 +127,31 @@ public void sendGetTopicsOfNamespaceResponse(List<String> topics, String topicsH
filtered, changed, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
public void sendGetSchemaResponse(long requestId, SchemaInfo schema, SchemaVersion version) {
BaseCommand command = Commands.newGetSchemaResponseCommand(requestId, schema, version);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
public void sendGetSchemaErrorResponse(long requestId, ServerError error, String errorMessage) {
BaseCommand command = Commands.newGetSchemaResponseErrorCommand(requestId, error, errorMessage);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
public void sendGetOrCreateSchemaResponse(long requestId, SchemaVersion schemaVersion) {
BaseCommand command = Commands.newGetOrCreateSchemaResponseCommand(requestId, schemaVersion);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
Expand All @@ -159,7 +160,7 @@ public void sendGetOrCreateSchemaErrorResponse(long requestId, ServerError error
Commands.newGetOrCreateSchemaResponseErrorCommand(requestId, error, errorMessage);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
Expand All @@ -168,7 +169,7 @@ public void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize,
clientProtocolVersion, maxMessageSize, supportsTopicWatchers);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
Expand All @@ -179,15 +180,15 @@ public void sendLookupResponse(String brokerServiceUrl, String brokerServiceUrlT
authoritative, response, requestId, proxyThroughServiceUrl);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
public void sendLookupResponse(ServerError error, String errorMsg, long requestId) {
BaseCommand command = Commands.newLookupErrorResponseCommand(error, errorMsg, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
Expand All @@ -196,17 +197,15 @@ public void sendActiveConsumerChange(long consumerId, boolean isActive) {
// if the client is older than `v12`, we don't need to send consumer group changes.
return;
}
cnx.ctx().writeAndFlush(
Commands.newActiveConsumerChange(consumerId, isActive),
cnx.ctx().voidPromise());
writeAndFlush(Commands.newActiveConsumerChange(consumerId, isActive));
}

@Override
public void sendReachedEndOfTopic(long consumerId) {
// Only send notification if the client understand the command
if (cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v9.getValue()) {
log.info("[{}] Notifying consumer that end of topic has been reached", this);
cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId), cnx.ctx().voidPromise());
writeAndFlush(Commands.newReachedEndOfTopic(consumerId));
}
}

Expand All @@ -215,8 +214,7 @@ public boolean sendTopicMigrated(ResourceType type, long resourceId, String brok
// Only send notification if the client understand the command
if (cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v20.getValue()) {
log.info("[{}] Notifying {} that topic is migrated", type.name(), resourceId);
cnx.ctx().writeAndFlush(Commands.newTopicMigrated(type, resourceId, brokerUrl, brokerUrlTls),
cnx.ctx().voidPromise());
writeAndFlush(Commands.newTopicMigrated(type, resourceId, brokerUrl, brokerUrlTls));
return true;
}
return false;
Expand Down Expand Up @@ -310,7 +308,7 @@ public void sendTcClientConnectResponse(long requestId, ServerError error, Strin
BaseCommand command = Commands.newTcClientConnectResponse(requestId, error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
Expand All @@ -324,7 +322,7 @@ public void sendNewTxnResponse(long requestId, TxnID txnID, long tcID) {
txnID.getMostSigBits());
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
if (this.interceptor != null) {
this.interceptor.txnOpened(tcID, txnID.toString());
}
Expand All @@ -335,7 +333,7 @@ public void sendNewTxnErrorResponse(long requestId, long tcID, ServerError error
BaseCommand command = Commands.newTxnResponse(requestId, tcID, error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
}

@Override
Expand All @@ -344,7 +342,7 @@ public void sendEndTxnResponse(long requestId, TxnID txnID, int txnAction) {
txnID.getMostSigBits());
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
if (this.interceptor != null) {
this.interceptor.txnEnded(txnID.toString(), txnAction);
}
Expand All @@ -356,7 +354,7 @@ public void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError err
txnID.getMostSigBits(), error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
writeAndFlush(outBuf);
if (this.interceptor != null) {
this.interceptor.txnEnded(txnID.toString(), TxnAction.ABORT_VALUE);
}
Expand All @@ -378,7 +376,11 @@ public void sendWatchTopicListUpdate(long watcherId,
private void interceptAndWriteCommand(BaseCommand command) {
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf);
writeAndFlush(outBuf);
}

private void writeAndFlush(ByteBuf outBuf) {
NettyChannelUtil.writeAndFlushWithVoidPromise(cnx.ctx(), outBuf);
}

private void safeIntercept(BaseCommand command, ServerCnx cnx) {
Expand Down
Loading

0 comments on commit d6fcdb8

Please sign in to comment.