Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP 79][common,broker,client] Change PartitionedTopicStats to support partial partitioned producer #10534

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
protected volatile PublishRateLimiter brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;

private DistributedIdGenerator producerNameGenerator;
private DistributedIdGenerator producerStatsKeyGenerator;

public final static String PRODUCER_NAME_GENERATOR_PATH = "/counters/producer-name";
public final static String PRODUCER_STATS_KEY_GENERATOR_PATH = "/counters/producer-stats-key";

private final BacklogQuotaManager backlogQuotaManager;

Expand Down Expand Up @@ -432,6 +434,8 @@ private ServerBootstrap defaultServerBootstrap() {
public void start() throws Exception {
this.producerNameGenerator = new DistributedIdGenerator(pulsar.getCoordinationService(),
PRODUCER_NAME_GENERATOR_PATH, pulsar.getConfiguration().getClusterName());
this.producerStatsKeyGenerator = new DistributedIdGenerator(pulsar.getCoordinationService(),
PRODUCER_STATS_KEY_GENERATOR_PATH, pulsar.getConfiguration().getClusterName());

ServerBootstrap bootstrap = defaultServerBootstrap.clone();

Expand Down Expand Up @@ -1841,6 +1845,10 @@ public String generateUniqueProducerName() {
return producerNameGenerator.getNextId();
}

public String generateUniqueProducerStatsKey() {
return producerStatsKeyGenerator.getNextId();
}

public Map<String, TopicStats> getTopicStats() {
HashMap<String, TopicStats> stats = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class Producer {

private final ProducerAccessMode accessMode;
private Optional<Long> topicEpoch;
private final Optional<String> producerStatsKey;

private final Map<String, String> metadata;

Expand All @@ -94,7 +95,8 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
boolean userProvidedProducerName,
ProducerAccessMode accessMode,
Optional<Long> topicEpoch) {
Optional<Long> topicEpoch,
Optional<String> producerStatsKey) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
Expand All @@ -107,6 +109,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
this.chunkedMessageRate = new Rate();
this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;
this.producerStatsKey = producerStatsKey;

this.metadata = metadata != null ? metadata : Collections.emptyMap();

Expand All @@ -122,6 +125,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
stats.producerId = producerId;
stats.metadata = this.metadata;
stats.accessMode = Commands.convertProducerAccessMode(accessMode);
producerStatsKey.ifPresent(key -> stats.producerStatsKey = key);

this.isRemote = producerName
.startsWith(cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix());
Expand Down Expand Up @@ -662,6 +666,10 @@ public Optional<Long> getTopicEpoch() {
return topicEpoch;
}

public Optional<String> getProducerStatsKey() {
return producerStatsKey;
}

private static final Logger log = LoggerFactory.getLogger(Producer.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ public interface PulsarCommandSender {

void sendErrorResponse(long requestId, ServerError error, String message);

void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion);
void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion,
Optional<String> producerStatsKey);

void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion, Optional<Long> topicEpoch,
boolean isProducerReady);
boolean isProducerReady, Optional<String> producerStatsKey);

void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, long ledgerId,
long entryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ public void sendErrorResponse(long requestId, ServerError error, String message)
}

@Override
public void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion) {
BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, schemaVersion);
public void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion,
Optional<String> producerStatsKey) {
BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, schemaVersion,
producerStatsKey);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf);
Expand All @@ -90,9 +92,9 @@ public void sendProducerSuccessResponse(long requestId, String producerName, Sch
@Override
public void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion, Optional<Long> topicEpoch,
boolean isProducerReady) {
boolean isProducerReady, Optional<String> producerStatsKey) {
BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, lastSequenceId,
schemaVersion, topicEpoch, isProducerReady);
schemaVersion, topicEpoch, isProducerReady, producerStatsKey);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v18;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -1081,6 +1082,11 @@ protected void handleProducer(final CommandProducer cmdProducer) {
final ProducerAccessMode producerAccessMode = cmdProducer.getProducerAccessMode();
final Optional<Long> topicEpoch = cmdProducer.hasTopicEpoch()
? Optional.of(cmdProducer.getTopicEpoch()) : Optional.empty();
final Optional<String> producerStatsKey = cmdProducer.hasProducerStatsKey()
? Optional.of(cmdProducer.getProducerStatsKey())
: getRemoteEndpointProtocolVersion() >= v18.getValue()
? Optional.of(service.generateUniqueProducerStatsKey())
: Optional.empty();

TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
if (topicName == null) {
Expand Down Expand Up @@ -1114,7 +1120,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
log.info("[{}] Producer with the same id is already created:"
+ " producerId={}, producer={}", remoteAddress, producerId, producer);
commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(),
producer.getSchemaVersion());
producer.getSchemaVersion(), producer.getProducerStatsKey());

return null;
} else {
Expand Down Expand Up @@ -1193,16 +1199,18 @@ protected void handleProducer(final CommandProducer cmdProducer) {
schemaVersionFuture.thenAccept(schemaVersion -> {
CompletableFuture<Void> producerQueuedFuture = new CompletableFuture<>();
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName,
getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
userProvidedProducerName, producerAccessMode, topicEpoch);
getPrincipal(), isEncrypted, metadata, schemaVersion,
epoch, userProvidedProducerName, producerAccessMode, topicEpoch,
producerStatsKey);

topic.addProducer(producer, producerQueuedFuture).thenAccept(newTopicEpoch -> {
if (isActive()) {
if (producerFuture.complete(producer)) {
log.info("[{}] Created new producer: {}", remoteAddress, producer);
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
newTopicEpoch, true /* producer is ready now */);
newTopicEpoch, true /* producer is ready now */,
producerStatsKey);
return;
} else {
// The producer's future was completed before by
Expand Down Expand Up @@ -1241,7 +1249,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
log.info("[{}] Producer is waiting in queue: {}", remoteAddress, producer);
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
Optional.empty(), false/* producer is not ready now */);
Optional.empty(), false/* producer is not ready now */,
producerStatsKey);
}
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -744,6 +745,7 @@ public NonPersistentTopicStats getStats(boolean getPreciseBacklog, boolean subsc

ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap<>();

final AtomicInteger nonKeyCounter = new AtomicInteger(0);
producers.values().forEach(producer -> {
NonPersistentPublisherStats publisherStats = (NonPersistentPublisherStats) producer.getStats();
stats.msgRateIn += publisherStats.msgRateIn;
Expand All @@ -752,7 +754,13 @@ public NonPersistentTopicStats getStats(boolean getPreciseBacklog, boolean subsc
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
stats.getPublishers().add(publisherStats);
if (publisherStats.producerStatsKey == null) {
final NonPersistentPublisherStats copyStats = (NonPersistentPublisherStats) publisherStats.clone();
copyStats.producerStatsKey = String.valueOf(nonKeyCounter.getAndIncrement());
stats.publishers.add(copyStats);
} else {
stats.publishers.add(publisherStats);
}
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -1692,6 +1693,7 @@ public TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklo

ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap<>();

final AtomicInteger nonKeyCounter = new AtomicInteger(0);
producers.values().forEach(producer -> {
PublisherStats publisherStats = producer.getStats();
stats.msgRateIn += publisherStats.msgRateIn;
Expand All @@ -1700,7 +1702,13 @@ public TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklo
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
stats.publishers.add(publisherStats);
if (publisherStats.producerStatsKey == null) {
final PublisherStats copyStats = publisherStats.clone();
copyStats.producerStatsKey = String.valueOf(nonKeyCounter.getAndIncrement());
stats.publishers.add(copyStats);
} else {
stats.publishers.add(publisherStats);
}
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down
Loading