Skip to content

Commit

Permalink
[Improve] Admin GetStats: Fill in missing fields (#1309)
Browse files Browse the repository at this point in the history
* admin append topic stats

* lint
  • Loading branch information
crossoverJie authored Nov 21, 2024
1 parent c369e75 commit 9366a0e
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 22 deletions.
52 changes: 51 additions & 1 deletion pulsaradmin/pkg/admin/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,21 @@ func TestPartitionState(t *testing.T) {

assert.Nil(t, err)
defer client.Close()
subName := "my-sub"
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
SubscriptionName: subName,
Type: pulsar.Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()

// create producer
producerName := "test-producer"
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: false,
Name: producerName,
})
assert.Nil(t, err)
defer producer.Close()
Expand Down Expand Up @@ -173,6 +176,53 @@ func TestPartitionState(t *testing.T) {
assert.Equal(t, len(subscriptionStats.Consumers), 0)
}

partition, err := topicName.GetPartition(0)
assert.Nil(t, err)
topicState, err := admin.Topics().GetStats(*partition)
assert.Nil(t, err)
assert.Equal(t, len(topicState.Publishers), 1)
publisher := topicState.Publishers[0]
assert.Equal(t, publisher.AccessModel, utils.ProduceModeShared)
assert.Equal(t, publisher.IsSupportsPartialProducer, false)
assert.Equal(t, publisher.ProducerName, producerName)
assert.Contains(t, publisher.Address, "127.0.0.1")
assert.Contains(t, publisher.ClientVersion, "Pulsar Go version")

sub := topicState.Subscriptions[subName]
assert.Equal(t, sub.BytesOutCounter, int64(0))
assert.Equal(t, sub.MsgOutCounter, int64(0))
assert.Equal(t, sub.MessageAckRate, float64(0))
assert.Equal(t, sub.ChunkedMessageRate, float64(0))
assert.Equal(t, sub.BacklogSize, int64(0))
assert.Equal(t, sub.EarliestMsgPublishTimeInBacklog, int64(0))
assert.Equal(t, sub.LastExpireTimestamp, int64(0))
assert.Equal(t, sub.TotalMsgExpired, int64(0))
assert.Equal(t, sub.LastMarkDeleteAdvancedTimestamp, int64(0))
assert.Equal(t, sub.IsDurable, true)
assert.Equal(t, sub.AllowOutOfOrderDelivery, false)
assert.Equal(t, sub.ConsumersAfterMarkDeletePosition, map[string]string{})
assert.Equal(t, sub.NonContiguousDeletedMessagesRanges, 0)
assert.Equal(t, sub.NonContiguousDeletedMessagesRangesSrzSize, 0)
assert.Equal(t, sub.DelayedMessageIndexSizeInBytes, int64(0))
assert.Equal(t, sub.SubscriptionProperties, map[string]string{})
assert.Equal(t, sub.FilterProcessedMsgCount, int64(0))
assert.Equal(t, sub.FilterAcceptedMsgCount, int64(0))
assert.Equal(t, sub.FilterRejectedMsgCount, int64(0))
assert.Equal(t, sub.FilterRescheduledMsgCount, int64(0))

assert.Equal(t, len(sub.Consumers), 1)
consumerState := sub.Consumers[0]
assert.Equal(t, consumerState.BytesOutCounter, int64(0))
assert.Equal(t, consumerState.MsgOutCounter, int64(0))
assert.Equal(t, consumerState.MessageAckRate, float64(0))
assert.Equal(t, consumerState.ChunkedMessageRate, float64(0))
assert.Equal(t, consumerState.AvgMessagesPerEntry, int(0))
assert.Contains(t, consumerState.Address, "127.0.0.1")
assert.Contains(t, consumerState.ClientVersion, "Pulsar Go version")
assert.Equal(t, consumerState.LastAckedTimestamp, int64(0))
assert.Equal(t, consumerState.LastConsumedTimestamp, int64(0))
assert.True(t, consumerState.LastConsumedFlowTimestamp > 0)

}
func TestNonPartitionState(t *testing.T) {
randomName := newTopicName()
Expand Down
89 changes: 68 additions & 21 deletions pulsaradmin/pkg/utils/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,31 +244,67 @@ type TopicStats struct {
DeDuplicationStatus string `json:"deduplicationStatus"`
}

type ProducerAccessMode string

const (
ProduceModeShared ProducerAccessMode = "Shared"
ProduceModeExclusive = "Exclusive"
ProduceModeExclusiveWithFencing = "ExclusiveWithFencing"
ProduceModeWaitForExclusive = "WaitForExclusive"
)

type PublisherStats struct {
ProducerID int64 `json:"producerId"`
MsgRateIn float64 `json:"msgRateIn"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
AverageMsgSize float64 `json:"averageMsgSize"`
Metadata map[string]string `json:"metadata"`
AccessModel ProducerAccessMode `json:"accessMode"`
ProducerID int64 `json:"producerId"`
MsgRateIn float64 `json:"msgRateIn"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
AverageMsgSize float64 `json:"averageMsgSize"`
ChunkedMessageRate float64 `json:"chunkedMessageRate"`
IsSupportsPartialProducer bool `json:"supportsPartialProducer"`
ProducerName string `json:"producerName"`
Address string `json:"address"`
ConnectedSince string `json:"connectedSince"`
ClientVersion string `json:"clientVersion"`
Metadata map[string]string `json:"metadata"`
}

type SubscriptionStats struct {
BlockedSubscriptionOnUnackedMsgs bool `json:"blockedSubscriptionOnUnackedMsgs"`
IsReplicated bool `json:"isReplicated"`
LastConsumedFlowTimestamp int64 `json:"lastConsumedFlowTimestamp"`
LastConsumedTimestamp int64 `json:"lastConsumedTimestamp"`
LastAckedTimestamp int64 `json:"lastAckedTimestamp"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateRedeliver float64 `json:"msgRateRedeliver"`
MsgRateExpired float64 `json:"msgRateExpired"`
MsgBacklog int64 `json:"msgBacklog"`
MsgBacklogNoDelayed int64 `json:"msgBacklogNoDelayed"`
MsgDelayed int64 `json:"msgDelayed"`
UnAckedMessages int64 `json:"unackedMessages"`
SubType string `json:"type"`
ActiveConsumerName string `json:"activeConsumerName"`
Consumers []ConsumerStats `json:"consumers"`
BlockedSubscriptionOnUnackedMsgs bool `json:"blockedSubscriptionOnUnackedMsgs"`
IsReplicated bool `json:"isReplicated"`
LastConsumedFlowTimestamp int64 `json:"lastConsumedFlowTimestamp"`
LastConsumedTimestamp int64 `json:"lastConsumedTimestamp"`
LastAckedTimestamp int64 `json:"lastAckedTimestamp"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateRedeliver float64 `json:"msgRateRedeliver"`
MsgRateExpired float64 `json:"msgRateExpired"`
MsgBacklog int64 `json:"msgBacklog"`
MsgBacklogNoDelayed int64 `json:"msgBacklogNoDelayed"`
MsgDelayed int64 `json:"msgDelayed"`
UnAckedMessages int64 `json:"unackedMessages"`
SubType string `json:"type"`
ActiveConsumerName string `json:"activeConsumerName"`
BytesOutCounter int64 `json:"bytesOutCounter"`
MsgOutCounter int64 `json:"msgOutCounter"`
MessageAckRate float64 `json:"messageAckRate"`
ChunkedMessageRate float64 `json:"chunkedMessageRate"`
BacklogSize int64 `json:"backlogSize"`
EarliestMsgPublishTimeInBacklog int64 `json:"earliestMsgPublishTimeInBacklog"`
TotalMsgExpired int64 `json:"totalMsgExpired"`
LastExpireTimestamp int64 `json:"lastExpireTimestamp"`
LastMarkDeleteAdvancedTimestamp int64 `json:"lastMarkDeleteAdvancedTimestamp"`
Consumers []ConsumerStats `json:"consumers"`
IsDurable bool `json:"isDurable"`
AllowOutOfOrderDelivery bool `json:"allowOutOfOrderDelivery"`
ConsumersAfterMarkDeletePosition map[string]string `json:"consumersAfterMarkDeletePosition"`
NonContiguousDeletedMessagesRanges int `json:"nonContiguousDeletedMessagesRanges"`
NonContiguousDeletedMessagesRangesSrzSize int `json:"nonContiguousDeletedMessagesRangesSerializedSize"`
DelayedMessageIndexSizeInBytes int64 `json:"delayedMessageIndexSizeInBytes"`
SubscriptionProperties map[string]string `json:"subscriptionProperties"`
FilterProcessedMsgCount int64 `json:"filterProcessedMsgCount"`
FilterAcceptedMsgCount int64 `json:"filterAcceptedMsgCount"`
FilterRejectedMsgCount int64 `json:"filterRejectedMsgCount"`
FilterRescheduledMsgCount int64 `json:"filterRescheduledMsgCount"`
}

type ConsumerStats struct {
Expand All @@ -279,6 +315,17 @@ type ConsumerStats struct {
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateRedeliver float64 `json:"msgRateRedeliver"`
ConsumerName string `json:"consumerName"`
BytesOutCounter int64 `json:"bytesOutCounter"`
MsgOutCounter int64 `json:"msgOutCounter"`
MessageAckRate float64 `json:"messageAckRate"`
ChunkedMessageRate float64 `json:"chunkedMessageRate"`
AvgMessagesPerEntry int `json:"avgMessagesPerEntry"`
Address string `json:"address"`
ConnectedSince string `json:"connectedSince"`
ClientVersion string `json:"clientVersion"`
LastAckedTimestamp int64 `json:"lastAckedTimestamp"`
LastConsumedTimestamp int64 `json:"lastConsumedTimestamp"`
LastConsumedFlowTimestamp int64 `json:"lastConsumedFlowTimestamp"`
Metadata map[string]string `json:"metadata"`
}

Expand Down

0 comments on commit 9366a0e

Please sign in to comment.