Skip to content

Commit

Permalink
[improve][broker] Add optional parameters for getPartitionedStats (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
crossoverJie authored Nov 30, 2023
1 parent 3377003 commit a832d29
Show file tree
Hide file tree
Showing 16 changed files with 340 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -1254,9 +1255,7 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR
}

protected CompletableFuture<? extends TopicStats> internalGetStatsAsync(boolean authoritative,
boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
GetStatsOptions getStatsOptions) {
CompletableFuture<Void> future;

if (topicName.isGlobal()) {
Expand All @@ -1268,8 +1267,7 @@ protected CompletableFuture<? extends TopicStats> internalGetStatsAsync(boolean
return future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenComposeAsync(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.asyncGetStats(getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog));
.thenCompose(topic -> topic.asyncGetStats(getStatsOptions));
}

protected CompletableFuture<PersistentTopicInternalStats> internalGetInternalStatsAsync(boolean authoritative,
Expand Down Expand Up @@ -1402,8 +1400,7 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
}

protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition,
boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
GetStatsOptions getStatsOptions) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
Expand All @@ -1419,6 +1416,14 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
}
PartitionedTopicStatsImpl stats = new PartitionedTopicStatsImpl(partitionMetadata);
List<CompletableFuture<TopicStats>> topicStatsFutureList = new ArrayList<>(partitionMetadata.partitions);
org.apache.pulsar.client.admin.GetStatsOptions statsOptions =
new org.apache.pulsar.client.admin.GetStatsOptions(
getStatsOptions.isGetPreciseBacklog(),
getStatsOptions.isSubscriptionBacklogSize(),
getStatsOptions.isGetEarliestTimeInBacklog(),
getStatsOptions.isExcludePublishers(),
getStatsOptions.isExcludeConsumers()
);
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName partition = topicName.getPartition(i);
topicStatsFutureList.add(
Expand All @@ -1428,13 +1433,11 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
if (owned) {
return getTopicReferenceAsync(partition)
.thenApply(ref ->
ref.getStats(getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog));
ref.getStats(getStatsOptions));
} else {
try {
return pulsar().getAdminClient().topics().getStatsAsync(
partition.toString(), getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog);
partition.toString(), statsOptions);
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
Expand Down Expand Up @@ -444,7 +445,9 @@ public void getStats(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetStatsAsync(authoritative, getPreciseBacklog, false, false)
GetStatsOptions getStatsOptions =
new GetStatsOptions(getPreciseBacklog, false, false, false, false);
internalGetStatsAsync(authoritative, getStatsOptions)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
Expand Down Expand Up @@ -511,7 +514,8 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false, false, false);
GetStatsOptions getStatsOptions = new GetStatsOptions(false, false, false, false, false);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getStatsOptions);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ public void getPartitionedStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If exclude the publishers")
@QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
@ApiParam(value = "If exclude the consumers")
@QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers) {
try {
validateTopicName(tenant, namespace, encodedTopic);
if (topicName.isPartitioned()) {
Expand All @@ -240,12 +244,19 @@ public void getPartitionedStats(
NonPersistentPartitionedTopicStatsImpl stats =
new NonPersistentPartitionedTopicStatsImpl(partitionMetadata);
List<CompletableFuture<TopicStats>> topicStatsFutureList = new ArrayList<>();
org.apache.pulsar.client.admin.GetStatsOptions statsOptions =
new org.apache.pulsar.client.admin.GetStatsOptions(
getPreciseBacklog,
subscriptionBacklogSize,
getEarliestTimeInBacklog,
excludePublishers,
excludeConsumers
);
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
topicStatsFutureList
.add(pulsar().getAdminClient().topics().getStatsAsync(
(topicName.getPartition(i).toString()), getPreciseBacklog,
subscriptionBacklogSize, getEarliestTimeInBacklog));
(topicName.getPartition(i).toString()), statsOptions));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -1195,9 +1196,16 @@ public void getStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
@ApiParam(value = "If return time of the earliest message in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If exclude the publishers")
@QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
@ApiParam(value = "If exclude the consumers")
@QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetStatsAsync(authoritative, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog)
GetStatsOptions getStatsOptions =
new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog,
excludePublishers, excludeConsumers);
internalGetStatsAsync(authoritative, getStatsOptions)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
Expand Down Expand Up @@ -1297,15 +1305,20 @@ public void getPartitionedStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If exclude the publishers")
@QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
@ApiParam(value = "If exclude the consumers")
@QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers) {
try {
validateTopicName(tenant, namespace, encodedTopic);
if (topicName.isPartitioned()) {
throw new RestException(Response.Status.PRECONDITION_FAILED,
"Partitioned Topic Name should not contain '-partition-'");
}
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog,
subscriptionBacklogSize, getEarliestTimeInBacklog);
GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog, excludePublishers, excludeConsumers);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getStatsOptions);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
@AllArgsConstructor
public class GetStatsOptions {
/**
* Set to true to get precise backlog, Otherwise get imprecise backlog.
*/
private final boolean getPreciseBacklog;

/**
* Whether to get backlog size for each subscription.
*/
private final boolean subscriptionBacklogSize;

/**
* Whether to get the earliest time in backlog.
*/
private final boolean getEarliestTimeInBacklog;

/**
* Whether to exclude publishers.
*/
private final boolean excludePublishers;

/**
* Whether to exclude consumers.
*/
private final boolean excludeConsumers;
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,14 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);

TopicStatsImpl getStats(GetStatsOptions getStatsOptions);

CompletableFuture<? extends TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);

CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions);

CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata);

Position getLastPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
Expand Down Expand Up @@ -437,7 +438,7 @@ public boolean expireMessages(Position position) {
+ " non-persistent topic.");
}

public NonPersistentSubscriptionStatsImpl getStats() {
public NonPersistentSubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
NonPersistentSubscriptionStatsImpl subStats = new NonPersistentSubscriptionStatsImpl();
subStats.bytesOutCounter = bytesOutFromRemovedConsumers.longValue();
subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue();
Expand All @@ -446,7 +447,9 @@ public NonPersistentSubscriptionStatsImpl getStats() {
if (dispatcher != null) {
dispatcher.getConsumers().forEach(consumer -> {
ConsumerStatsImpl consumerStats = consumer.getStats();
subStats.consumers.add(consumerStats);
if (!getStatsOptions.isExcludeConsumers()) {
subStats.consumers.add(consumerStats);
}
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.messageAckRate += consumerStats.messageAckRate;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
Expand Down
Loading

0 comments on commit a832d29

Please sign in to comment.