Skip to content

Commit

Permalink
feat: change PartitionedProducerStats collecting logic
Browse files Browse the repository at this point in the history
  • Loading branch information
equanz committed May 10, 2021
1 parent d49aab4 commit c0a4845
Show file tree
Hide file tree
Showing 16 changed files with 185 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
protected volatile PublishRateLimiter brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;

private DistributedIdGenerator producerNameGenerator;
private DistributedIdGenerator producerStatsKeyGenerator;

public final static String PRODUCER_NAME_GENERATOR_PATH = "/counters/producer-name";
public final static String PRODUCER_STATS_KEY_GENERATOR_PATH = "/counters/producer-stats-key";

private final BacklogQuotaManager backlogQuotaManager;

Expand Down Expand Up @@ -432,6 +434,8 @@ private ServerBootstrap defaultServerBootstrap() {
public void start() throws Exception {
this.producerNameGenerator = new DistributedIdGenerator(pulsar.getCoordinationService(),
PRODUCER_NAME_GENERATOR_PATH, pulsar.getConfiguration().getClusterName());
this.producerStatsKeyGenerator = new DistributedIdGenerator(pulsar.getCoordinationService(),
PRODUCER_STATS_KEY_GENERATOR_PATH, pulsar.getConfiguration().getClusterName());

ServerBootstrap bootstrap = defaultServerBootstrap.clone();

Expand Down Expand Up @@ -1841,6 +1845,10 @@ public String generateUniqueProducerName() {
return producerNameGenerator.getNextId();
}

public String generateUniqueProducerStatsKey() {
return producerStatsKeyGenerator.getNextId();
}

public Map<String, TopicStats> getTopicStats() {
HashMap<String, TopicStats> stats = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class Producer {

private final ProducerAccessMode accessMode;
private Optional<Long> topicEpoch;
private final Optional<String> producerStatsKey;

private final Map<String, String> metadata;

Expand All @@ -94,7 +95,8 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
boolean userProvidedProducerName,
ProducerAccessMode accessMode,
Optional<Long> topicEpoch) {
Optional<Long> topicEpoch,
Optional<String> producerStatsKey) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
Expand All @@ -107,6 +109,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
this.chunkedMessageRate = new Rate();
this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;
this.producerStatsKey = producerStatsKey;

this.metadata = metadata != null ? metadata : Collections.emptyMap();

Expand All @@ -122,6 +125,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
stats.producerId = producerId;
stats.metadata = this.metadata;
stats.accessMode = Commands.convertProducerAccessMode(accessMode);
producerStatsKey.ifPresent(key -> stats.producerStatsKey = key);

this.isRemote = producerName
.startsWith(cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix());
Expand Down Expand Up @@ -662,6 +666,10 @@ public Optional<Long> getTopicEpoch() {
return topicEpoch;
}

public Optional<String> getProducerStatsKey() {
return producerStatsKey;
}

private static final Logger log = LoggerFactory.getLogger(Producer.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ public interface PulsarCommandSender {

void sendErrorResponse(long requestId, ServerError error, String message);

void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion);
void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion,
Optional<String> producerStatsKey);

void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion, Optional<Long> topicEpoch,
boolean isProducerReady);
boolean isProducerReady, Optional<String> producerStatsKey);

void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, long ledgerId,
long entryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ public void sendErrorResponse(long requestId, ServerError error, String message)
}

@Override
public void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion) {
BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, schemaVersion);
public void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion,
Optional<String> producerStatsKey) {
BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, schemaVersion,
producerStatsKey);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf);
Expand All @@ -90,9 +92,9 @@ public void sendProducerSuccessResponse(long requestId, String producerName, Sch
@Override
public void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion, Optional<Long> topicEpoch,
boolean isProducerReady) {
boolean isProducerReady, Optional<String> producerStatsKey) {
BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, lastSequenceId,
schemaVersion, topicEpoch, isProducerReady);
schemaVersion, topicEpoch, isProducerReady, producerStatsKey);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v18;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -1081,6 +1082,11 @@ protected void handleProducer(final CommandProducer cmdProducer) {
final ProducerAccessMode producerAccessMode = cmdProducer.getProducerAccessMode();
final Optional<Long> topicEpoch = cmdProducer.hasTopicEpoch()
? Optional.of(cmdProducer.getTopicEpoch()) : Optional.empty();
final Optional<String> producerStatsKey = cmdProducer.hasProducerStatsKey()
? Optional.of(cmdProducer.getProducerStatsKey())
: getRemoteEndpointProtocolVersion() >= v18.getValue()
? Optional.of(service.generateUniqueProducerStatsKey())
: Optional.empty();

TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
if (topicName == null) {
Expand Down Expand Up @@ -1114,7 +1120,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
log.info("[{}] Producer with the same id is already created:"
+ " producerId={}, producer={}", remoteAddress, producerId, producer);
commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(),
producer.getSchemaVersion());
producer.getSchemaVersion(), producer.getProducerStatsKey());

return null;
} else {
Expand Down Expand Up @@ -1193,16 +1199,18 @@ protected void handleProducer(final CommandProducer cmdProducer) {
schemaVersionFuture.thenAccept(schemaVersion -> {
CompletableFuture<Void> producerQueuedFuture = new CompletableFuture<>();
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName,
getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
userProvidedProducerName, producerAccessMode, topicEpoch);
getPrincipal(), isEncrypted, metadata, schemaVersion,
epoch, userProvidedProducerName, producerAccessMode, topicEpoch,
producerStatsKey);

topic.addProducer(producer, producerQueuedFuture).thenAccept(newTopicEpoch -> {
if (isActive()) {
if (producerFuture.complete(producer)) {
log.info("[{}] Created new producer: {}", remoteAddress, producer);
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
newTopicEpoch, true /* producer is ready now */);
newTopicEpoch, true /* producer is ready now */,
producerStatsKey);
return;
} else {
// The producer's future was completed before by
Expand Down Expand Up @@ -1241,7 +1249,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
log.info("[{}] Producer is waiting in queue: {}", remoteAddress, producer);
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
Optional.empty(), false/* producer is not ready now */);
Optional.empty(), false/* producer is not ready now */,
producerStatsKey);
}
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -744,6 +745,7 @@ public NonPersistentTopicStats getStats(boolean getPreciseBacklog, boolean subsc

ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap<>();

final AtomicInteger nonKeyCounter = new AtomicInteger(0);
producers.values().forEach(producer -> {
NonPersistentPublisherStats publisherStats = (NonPersistentPublisherStats) producer.getStats();
stats.msgRateIn += publisherStats.msgRateIn;
Expand All @@ -752,7 +754,13 @@ public NonPersistentTopicStats getStats(boolean getPreciseBacklog, boolean subsc
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
stats.getPublishers().add(publisherStats);
if (publisherStats.producerStatsKey == null) {
final NonPersistentPublisherStats copyStats = (NonPersistentPublisherStats) publisherStats.clone();
copyStats.producerStatsKey = String.valueOf(nonKeyCounter.getAndIncrement());
stats.publishers.add(copyStats);
} else {
stats.publishers.add(publisherStats);
}
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -1692,6 +1693,7 @@ public TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklo

ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap<>();

final AtomicInteger nonKeyCounter = new AtomicInteger(0);
producers.values().forEach(producer -> {
PublisherStats publisherStats = producer.getStats();
stats.msgRateIn += publisherStats.msgRateIn;
Expand All @@ -1700,7 +1702,13 @@ public TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklo
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
stats.publishers.add(publisherStats);
if (publisherStats.producerStatsKey == null) {
final PublisherStats copyStats = publisherStats.clone();
copyStats.producerStatsKey = String.valueOf(nonKeyCounter.getAndIncrement());
stats.publishers.add(copyStats);
} else {
stats.publishers.add(publisherStats);
}
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ protected void handleProducerSuccess(CommandProducerSuccess success) {
ProducerResponse pr = new ProducerResponse(success.getProducerName(),
success.getLastSequenceId(),
success.getSchemaVersion(),
success.hasTopicEpoch() ? Optional.of(success.getTopicEpoch()) : Optional.empty());
success.hasTopicEpoch() ? Optional.of(success.getTopicEpoch()) : Optional.empty(),
success.hasProducerStatsKey() ? success.getProducerStatsKey() : null);
requestFuture.complete(pr);
} else {
log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId());
Expand Down
Loading

0 comments on commit c0a4845

Please sign in to comment.