Skip to content

Commit

Permalink
Merge pull request #2533 from hindessm/mrh/extend-throttling-metric-s…
Browse files Browse the repository at this point in the history
…cope

extend throttling metric scope
  • Loading branch information
dnwe committed Jul 29, 2023
2 parents aa72f59 + b678d34 commit f07b129
Show file tree
Hide file tree
Showing 38 changed files with 201 additions and 14 deletions.
4 changes: 4 additions & 0 deletions acl_create_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *CreateAclsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

// AclCreationResponse is an acl creation response type
type AclCreationResponse struct {
Err KError
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *DeleteAclsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

// FilterResponse is a filter response type
type FilterResponse struct {
Err KError
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
}

func (r *DescribeAclsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *AddOffsetsToTxnResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions add_partitions_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *AddPartitionsToTxnResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

// PartitionError is a partition error type
type PartitionError struct {
Partition int32
Expand Down
4 changes: 4 additions & 0 deletions alter_client_quotas_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,7 @@ func (a *AlterClientQuotasResponse) headerVersion() int16 {
func (a *AlterClientQuotasResponse) requiredVersion() KafkaVersion {
return V2_6_0_0
}

func (r *AlterClientQuotasResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,7 @@ func (a *AlterConfigsResponse) headerVersion() int16 {
func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *AlterConfigsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
6 changes: 6 additions & 0 deletions alter_partition_reassignments_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type alterPartitionReassignmentsErrorBlock struct {
errorCode KError
errorMessage *string
Expand Down Expand Up @@ -155,3 +157,7 @@ func (r *AlterPartitionReassignmentsResponse) headerVersion() int16 {
func (r *AlterPartitionReassignmentsResponse) requiredVersion() KafkaVersion {
return V2_4_0_0
}

func (r *AlterPartitionReassignmentsResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTimeMs) * time.Millisecond
}
4 changes: 4 additions & 0 deletions alter_user_scram_credentials_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,7 @@ func (r *AlterUserScramCredentialsResponse) headerVersion() int16 {
func (r *AlterUserScramCredentialsResponse) requiredVersion() KafkaVersion {
return V2_7_0_0
}

func (r *AlterUserScramCredentialsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
6 changes: 6 additions & 0 deletions api_versions_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

// ApiVersionsResponseKey contains the APIs supported by the broker.
type ApiVersionsResponseKey struct {
// Version defines the protocol version to use for encode and decode
Expand Down Expand Up @@ -154,3 +156,7 @@ func (r *ApiVersionsResponse) requiredVersion() KafkaVersion {
return V0_10_0_0
}
}

func (r *ApiVersionsResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTimeMs) * time.Millisecond
}
41 changes: 28 additions & 13 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error
}

// Well-formed response
b.updateThrottleMetric(res.ThrottleTime)
b.updateThrottleMetric(res)
cb(res, nil)
},
}
Expand All @@ -479,7 +479,6 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
} else {
response = new(ProduceResponse)
err = b.sendAndReceive(request, response)
b.updateThrottleMetric(response.ThrottleTime)
}

if err != nil {
Expand Down Expand Up @@ -944,7 +943,7 @@ func (b *Broker) write(buf []byte) (n int, err error) {
return b.conn.Write(buf)
}

// b.lock must be haled by caller
// b.lock must be held by caller
func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
var promise *responsePromise
if promiseResponse {
Expand Down Expand Up @@ -1042,7 +1041,14 @@ func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
return nil
}

return handleResponsePromise(req, res, promise, b.metricRegistry)
err = handleResponsePromise(req, res, promise, b.metricRegistry)
if err != nil {
return err
}
if res != nil {
b.updateThrottleMetric(res)
}
return nil
}

func handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise, metricRegistry metrics.Registry) error {
Expand Down Expand Up @@ -1635,15 +1641,24 @@ func (b *Broker) updateProtocolMetrics(rb protocolBody) {
}
}

func (b *Broker) updateThrottleMetric(throttleTime time.Duration) {
if throttleTime != time.Duration(0) {
DebugLogger.Printf(
"producer/broker/%d ProduceResponse throttled %v\n",
b.ID(), throttleTime)
if b.brokerThrottleTime != nil {
throttleTimeInMs := int64(throttleTime / time.Millisecond)
b.brokerThrottleTime.Update(throttleTimeInMs)
}
type throttleSupport interface {
throttleTime() time.Duration
}

func (b *Broker) updateThrottleMetric(resp protocolBody) {
throttledResponse, ok := resp.(throttleSupport)
if !ok {
return
}
throttleTime := throttledResponse.throttleTime()
if throttleTime == time.Duration(0) {
return
}
DebugLogger.Printf(
"broker/%d %T throttled %v\n", b.ID(), resp, throttleTime)
if b.brokerThrottleTime != nil {
throttleTimeInMs := int64(throttleTime / time.Millisecond)
b.brokerThrottleTime.Update(throttleTimeInMs)
}
}

Expand Down
4 changes: 4 additions & 0 deletions create_partitions_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion {
return V1_0_0_0
}

func (r *CreatePartitionsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

type TopicPartitionError struct {
Err KError
ErrMsg *string
Expand Down
4 changes: 4 additions & 0 deletions create_topics_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
}
}

func (r *CreateTopicsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

type TopicError struct {
Err KError
ErrMsg *string
Expand Down
4 changes: 4 additions & 0 deletions delete_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ func (r *DeleteGroupsResponse) headerVersion() int16 {
func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion {
return V1_1_0_0
}

func (r *DeleteGroupsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions delete_offsets_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,7 @@ func (r *DeleteOffsetsResponse) headerVersion() int16 {
func (r *DeleteOffsetsResponse) requiredVersion() KafkaVersion {
return V2_4_0_0
}

func (r *DeleteOffsetsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions delete_records_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *DeleteRecordsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

type DeleteRecordsResponseTopic struct {
Partitions map[int32]*DeleteRecordsResponsePartition
}
Expand Down
4 changes: 4 additions & 0 deletions delete_topics_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion {
return V0_10_1_0
}
}

func (r *DeleteTopicsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions describe_client_quotas_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,7 @@ func (d *DescribeClientQuotasResponse) headerVersion() int16 {
func (d *DescribeClientQuotasResponse) requiredVersion() KafkaVersion {
return V2_6_0_0
}

func (r *DescribeClientQuotasResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions describe_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
}
}

func (r *DescribeConfigsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(r.ErrorCode)

Expand Down
6 changes: 6 additions & 0 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type DescribeGroupsResponse struct {
// Version defines the protocol version to use for encode and decode
Version int16
Expand Down Expand Up @@ -77,6 +79,10 @@ func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
}

func (r *DescribeGroupsResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTimeMs) * time.Millisecond
}

// GroupDescription contains each described group.
type GroupDescription struct {
// Version defines the protocol version to use for encode and decode
Expand Down
4 changes: 4 additions & 0 deletions describe_log_dirs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion {
return V1_0_0_0
}

func (r *DescribeLogDirsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

type DescribeLogDirsResponseDirMetadata struct {
ErrorCode KError

Expand Down
4 changes: 4 additions & 0 deletions describe_user_scram_credentials_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,7 @@ func (r *DescribeUserScramCredentialsResponse) headerVersion() int16 {
func (r *DescribeUserScramCredentialsResponse) requiredVersion() KafkaVersion {
return V2_7_0_0
}

func (r *DescribeUserScramCredentialsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions end_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func (r *EndTxnResponse) headerVersion() int16 {
func (e *EndTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *EndTxnResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ func (r *FetchResponse) requiredVersion() KafkaVersion {
}
}

func (r *FetchResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
if r.Blocks == nil {
return nil
Expand Down
4 changes: 4 additions & 0 deletions find_coordinator_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,7 @@ func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion {
return V0_8_2_0
}
}

func (r *FindCoordinatorResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
6 changes: 6 additions & 0 deletions heartbeat_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type HeartbeatResponse struct {
Version int16
ThrottleTime int32
Expand Down Expand Up @@ -50,3 +52,7 @@ func (r *HeartbeatResponse) requiredVersion() KafkaVersion {
}
return V0_9_0_0
}

func (r *HeartbeatResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTime) * time.Millisecond
}
4 changes: 4 additions & 0 deletions incremental_alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,7 @@ func (a *IncrementalAlterConfigsResponse) headerVersion() int16 {
func (a *IncrementalAlterConfigsResponse) requiredVersion() KafkaVersion {
return V2_3_0_0
}

func (r *IncrementalAlterConfigsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions init_producer_id_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,7 @@ func (i *InitProducerIDResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
}

func (r *InitProducerIDResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
6 changes: 6 additions & 0 deletions join_group_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type JoinGroupResponse struct {
Version int16
ThrottleTime int32
Expand Down Expand Up @@ -157,3 +159,7 @@ func (r *JoinGroupResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
}
}

func (r *JoinGroupResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTime) * time.Millisecond
}
6 changes: 6 additions & 0 deletions leave_group_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type MemberResponse struct {
MemberId string
GroupInstanceId *string
Expand Down Expand Up @@ -90,3 +92,7 @@ func (r *LeaveGroupResponse) requiredVersion() KafkaVersion {
}
return V0_9_0_0
}

func (r *LeaveGroupResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTime) * time.Millisecond
}
Loading

0 comments on commit f07b129

Please sign in to comment.