diff --git a/acl_create_response.go b/acl_create_response.go index 21d6c340c..ecfe119ce 100644 --- a/acl_create_response.go +++ b/acl_create_response.go @@ -63,6 +63,10 @@ func (c *CreateAclsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } +func (r *CreateAclsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} + // AclCreationResponse is an acl creation response type type AclCreationResponse struct { Err KError diff --git a/acl_delete_response.go b/acl_delete_response.go index cd33749d5..6f9b49c4a 100644 --- a/acl_delete_response.go +++ b/acl_delete_response.go @@ -64,6 +64,10 @@ func (d *DeleteAclsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } +func (r *DeleteAclsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} + // FilterResponse is a filter response type type FilterResponse struct { Err KError diff --git a/acl_describe_response.go b/acl_describe_response.go index 3255fd485..0b43b5d8d 100644 --- a/acl_describe_response.go +++ b/acl_describe_response.go @@ -89,3 +89,7 @@ func (d *DescribeAclsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } } + +func (r *DescribeAclsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/add_offsets_to_txn_response.go b/add_offsets_to_txn_response.go index bb61973d1..35d6644c5 100644 --- a/add_offsets_to_txn_response.go +++ b/add_offsets_to_txn_response.go @@ -47,3 +47,7 @@ func (a *AddOffsetsToTxnResponse) headerVersion() int16 { func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } + +func (r *AddOffsetsToTxnResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/add_partitions_to_txn_response.go b/add_partitions_to_txn_response.go index 098956507..4adfaf883 100644 --- a/add_partitions_to_txn_response.go +++ b/add_partitions_to_txn_response.go @@ -87,6 +87,10 @@ func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } +func (r *AddPartitionsToTxnResponse) throttleTime() time.Duration { + return r.ThrottleTime +} + // PartitionError is a partition error type type PartitionError struct { Partition int32 diff --git a/alter_client_quotas_response.go b/alter_client_quotas_response.go index ccd27d5f5..4d68e69ed 100644 --- a/alter_client_quotas_response.go +++ b/alter_client_quotas_response.go @@ -143,3 +143,7 @@ func (a *AlterClientQuotasResponse) headerVersion() int16 { func (a *AlterClientQuotasResponse) requiredVersion() KafkaVersion { return V2_6_0_0 } + +func (r *AlterClientQuotasResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/alter_configs_response.go b/alter_configs_response.go index 84cd86c72..15749a8f8 100644 --- a/alter_configs_response.go +++ b/alter_configs_response.go @@ -114,3 +114,7 @@ func (a *AlterConfigsResponse) headerVersion() int16 { func (a *AlterConfigsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } + +func (r *AlterConfigsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/alter_partition_reassignments_response.go b/alter_partition_reassignments_response.go index b3f9a15fe..0765486ee 100644 --- a/alter_partition_reassignments_response.go +++ b/alter_partition_reassignments_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type alterPartitionReassignmentsErrorBlock struct { errorCode KError errorMessage *string @@ -155,3 +157,7 @@ func (r *AlterPartitionReassignmentsResponse) headerVersion() int16 { func (r *AlterPartitionReassignmentsResponse) requiredVersion() KafkaVersion { return V2_4_0_0 } + +func (r *AlterPartitionReassignmentsResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTimeMs) * time.Millisecond +} diff --git a/alter_user_scram_credentials_response.go b/alter_user_scram_credentials_response.go index 31e167b5e..018483c9e 100644 --- a/alter_user_scram_credentials_response.go +++ b/alter_user_scram_credentials_response.go @@ -92,3 +92,7 @@ func (r *AlterUserScramCredentialsResponse) headerVersion() int16 { func (r *AlterUserScramCredentialsResponse) requiredVersion() KafkaVersion { return V2_7_0_0 } + +func (r *AlterUserScramCredentialsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/api_versions_response.go b/api_versions_response.go index ade911c59..9643ee1fc 100644 --- a/api_versions_response.go +++ b/api_versions_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + // ApiVersionsResponseKey contains the APIs supported by the broker. type ApiVersionsResponseKey struct { // Version defines the protocol version to use for encode and decode @@ -154,3 +156,7 @@ func (r *ApiVersionsResponse) requiredVersion() KafkaVersion { return V0_10_0_0 } } + +func (r *ApiVersionsResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTimeMs) * time.Millisecond +} diff --git a/broker.go b/broker.go index 7ed987fe3..13c50483c 100644 --- a/broker.go +++ b/broker.go @@ -456,7 +456,7 @@ func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error } // Well-formed response - b.updateThrottleMetric(res.ThrottleTime) + b.updateThrottleMetric(res) cb(res, nil) }, } @@ -479,7 +479,6 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { } else { response = new(ProduceResponse) err = b.sendAndReceive(request, response) - b.updateThrottleMetric(response.ThrottleTime) } if err != nil { @@ -944,7 +943,7 @@ func (b *Broker) write(buf []byte) (n int, err error) { return b.conn.Write(buf) } -// b.lock must be haled by caller +// b.lock must be held by caller func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) { var promise *responsePromise if promiseResponse { @@ -1042,7 +1041,14 @@ func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error { return nil } - return handleResponsePromise(req, res, promise, b.metricRegistry) + err = handleResponsePromise(req, res, promise, b.metricRegistry) + if err != nil { + return err + } + if res != nil { + b.updateThrottleMetric(res) + } + return nil } func handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise, metricRegistry metrics.Registry) error { @@ -1635,15 +1641,24 @@ func (b *Broker) updateProtocolMetrics(rb protocolBody) { } } -func (b *Broker) updateThrottleMetric(throttleTime time.Duration) { - if throttleTime != time.Duration(0) { - DebugLogger.Printf( - "producer/broker/%d ProduceResponse throttled %v\n", - b.ID(), throttleTime) - if b.brokerThrottleTime != nil { - throttleTimeInMs := int64(throttleTime / time.Millisecond) - b.brokerThrottleTime.Update(throttleTimeInMs) - } +type throttleSupport interface { + throttleTime() time.Duration +} + +func (b *Broker) updateThrottleMetric(resp protocolBody) { + throttledResponse, ok := resp.(throttleSupport) + if !ok { + return + } + throttleTime := throttledResponse.throttleTime() + if throttleTime == time.Duration(0) { + return + } + DebugLogger.Printf( + "broker/%d %T throttled %v\n", b.ID(), resp, throttleTime) + if b.brokerThrottleTime != nil { + throttleTimeInMs := int64(throttleTime / time.Millisecond) + b.brokerThrottleTime.Update(throttleTimeInMs) } } diff --git a/create_partitions_response.go b/create_partitions_response.go index 235787f13..2ac2d11a9 100644 --- a/create_partitions_response.go +++ b/create_partitions_response.go @@ -71,6 +71,10 @@ func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion { return V1_0_0_0 } +func (r *CreatePartitionsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} + type TopicPartitionError struct { Err KError ErrMsg *string diff --git a/create_topics_response.go b/create_topics_response.go index 6b940bff0..f3961b7b8 100644 --- a/create_topics_response.go +++ b/create_topics_response.go @@ -85,6 +85,10 @@ func (c *CreateTopicsResponse) requiredVersion() KafkaVersion { } } +func (r *CreateTopicsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} + type TopicError struct { Err KError ErrMsg *string diff --git a/delete_groups_response.go b/delete_groups_response.go index 5e7b1ed36..13d210fca 100644 --- a/delete_groups_response.go +++ b/delete_groups_response.go @@ -72,3 +72,7 @@ func (r *DeleteGroupsResponse) headerVersion() int16 { func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion { return V1_1_0_0 } + +func (r *DeleteGroupsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/delete_offsets_response.go b/delete_offsets_response.go index d59ae0f8c..4712423c2 100644 --- a/delete_offsets_response.go +++ b/delete_offsets_response.go @@ -110,3 +110,7 @@ func (r *DeleteOffsetsResponse) headerVersion() int16 { func (r *DeleteOffsetsResponse) requiredVersion() KafkaVersion { return V2_4_0_0 } + +func (r *DeleteOffsetsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/delete_records_response.go b/delete_records_response.go index d530b4c7e..f501338ae 100644 --- a/delete_records_response.go +++ b/delete_records_response.go @@ -88,6 +88,10 @@ func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } +func (r *DeleteRecordsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} + type DeleteRecordsResponseTopic struct { Partitions map[int32]*DeleteRecordsResponsePartition } diff --git a/delete_topics_response.go b/delete_topics_response.go index 733961a89..efafba5d4 100644 --- a/delete_topics_response.go +++ b/delete_topics_response.go @@ -80,3 +80,7 @@ func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion { return V0_10_1_0 } } + +func (r *DeleteTopicsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/describe_client_quotas_response.go b/describe_client_quotas_response.go index 555da0c48..2b1336dc7 100644 --- a/describe_client_quotas_response.go +++ b/describe_client_quotas_response.go @@ -233,3 +233,7 @@ func (d *DescribeClientQuotasResponse) headerVersion() int16 { func (d *DescribeClientQuotasResponse) requiredVersion() KafkaVersion { return V2_6_0_0 } + +func (r *DescribeClientQuotasResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/describe_configs_response.go b/describe_configs_response.go index 4968f4854..05036fb09 100644 --- a/describe_configs_response.go +++ b/describe_configs_response.go @@ -127,6 +127,10 @@ func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion { } } +func (r *DescribeConfigsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} + func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) { pe.putInt16(r.ErrorCode) diff --git a/describe_groups_response.go b/describe_groups_response.go index 12bf93e15..36ccb702f 100644 --- a/describe_groups_response.go +++ b/describe_groups_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type DescribeGroupsResponse struct { // Version defines the protocol version to use for encode and decode Version int16 @@ -77,6 +79,10 @@ func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion { return V0_9_0_0 } +func (r *DescribeGroupsResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTimeMs) * time.Millisecond +} + // GroupDescription contains each described group. type GroupDescription struct { // Version defines the protocol version to use for encode and decode diff --git a/describe_log_dirs_response.go b/describe_log_dirs_response.go index 411da38ad..47ff20ac9 100644 --- a/describe_log_dirs_response.go +++ b/describe_log_dirs_response.go @@ -69,6 +69,10 @@ func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion { return V1_0_0_0 } +func (r *DescribeLogDirsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} + type DescribeLogDirsResponseDirMetadata struct { ErrorCode KError diff --git a/describe_user_scram_credentials_response.go b/describe_user_scram_credentials_response.go index 2656c2faa..a209208fe 100644 --- a/describe_user_scram_credentials_response.go +++ b/describe_user_scram_credentials_response.go @@ -166,3 +166,7 @@ func (r *DescribeUserScramCredentialsResponse) headerVersion() int16 { func (r *DescribeUserScramCredentialsResponse) requiredVersion() KafkaVersion { return V2_7_0_0 } + +func (r *DescribeUserScramCredentialsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/end_txn_response.go b/end_txn_response.go index dd2a04504..a3a28011b 100644 --- a/end_txn_response.go +++ b/end_txn_response.go @@ -46,3 +46,7 @@ func (r *EndTxnResponse) headerVersion() int16 { func (e *EndTxnResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } + +func (r *EndTxnResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/fetch_response.go b/fetch_response.go index 3d449c85e..e1700b7b8 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -413,6 +413,10 @@ func (r *FetchResponse) requiredVersion() KafkaVersion { } } +func (r *FetchResponse) throttleTime() time.Duration { + return r.ThrottleTime +} + func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock { if r.Blocks == nil { return nil diff --git a/find_coordinator_response.go b/find_coordinator_response.go index 83a648ad4..68cbcbebe 100644 --- a/find_coordinator_response.go +++ b/find_coordinator_response.go @@ -94,3 +94,7 @@ func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion { return V0_8_2_0 } } + +func (r *FindCoordinatorResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/heartbeat_response.go b/heartbeat_response.go index 95ef97f47..d0a6a2eff 100644 --- a/heartbeat_response.go +++ b/heartbeat_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type HeartbeatResponse struct { Version int16 ThrottleTime int32 @@ -50,3 +52,7 @@ func (r *HeartbeatResponse) requiredVersion() KafkaVersion { } return V0_9_0_0 } + +func (r *HeartbeatResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTime) * time.Millisecond +} diff --git a/incremental_alter_configs_response.go b/incremental_alter_configs_response.go index 3e8c4500c..2320ed287 100644 --- a/incremental_alter_configs_response.go +++ b/incremental_alter_configs_response.go @@ -64,3 +64,7 @@ func (a *IncrementalAlterConfigsResponse) headerVersion() int16 { func (a *IncrementalAlterConfigsResponse) requiredVersion() KafkaVersion { return V2_3_0_0 } + +func (r *IncrementalAlterConfigsResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/init_producer_id_response.go b/init_producer_id_response.go index 006070189..e22580922 100644 --- a/init_producer_id_response.go +++ b/init_producer_id_response.go @@ -83,3 +83,7 @@ func (i *InitProducerIDResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } } + +func (r *InitProducerIDResponse) throttleTime() time.Duration { + return r.ThrottleTime +} diff --git a/join_group_response.go b/join_group_response.go index d8aa1f002..e1fd94c04 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type JoinGroupResponse struct { Version int16 ThrottleTime int32 @@ -157,3 +159,7 @@ func (r *JoinGroupResponse) requiredVersion() KafkaVersion { return V0_9_0_0 } } + +func (r *JoinGroupResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTime) * time.Millisecond +} diff --git a/leave_group_response.go b/leave_group_response.go index 18ed357e8..ccc7e5687 100644 --- a/leave_group_response.go +++ b/leave_group_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type MemberResponse struct { MemberId string GroupInstanceId *string @@ -90,3 +92,7 @@ func (r *LeaveGroupResponse) requiredVersion() KafkaVersion { } return V0_9_0_0 } + +func (r *LeaveGroupResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTime) * time.Millisecond +} diff --git a/list_partition_reassignments_response.go b/list_partition_reassignments_response.go index 4baa6a08e..568791fea 100644 --- a/list_partition_reassignments_response.go +++ b/list_partition_reassignments_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type PartitionReplicaReassignmentsStatus struct { Replicas []int32 AddingReplicas []int32 @@ -167,3 +169,7 @@ func (r *ListPartitionReassignmentsResponse) headerVersion() int16 { func (r *ListPartitionReassignmentsResponse) requiredVersion() KafkaVersion { return V2_4_0_0 } + +func (r *ListPartitionReassignmentsResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTimeMs) * time.Millisecond +} diff --git a/metadata_response.go b/metadata_response.go index 10a56877d..a9b979150 100644 --- a/metadata_response.go +++ b/metadata_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + // PartitionMetadata contains each partition in the topic. type PartitionMetadata struct { // Version defines the protocol version to use for encode and decode @@ -292,6 +294,10 @@ func (r *MetadataResponse) requiredVersion() KafkaVersion { } } +func (r *MetadataResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTimeMs) * time.Millisecond +} + // testing API func (r *MetadataResponse) AddBroker(addr string, id int32) { diff --git a/offset_commit_response.go b/offset_commit_response.go index 4bed269aa..80828c853 100644 --- a/offset_commit_response.go +++ b/offset_commit_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type OffsetCommitResponse struct { Version int16 ThrottleTimeMs int32 @@ -114,3 +116,7 @@ func (r *OffsetCommitResponse) requiredVersion() KafkaVersion { return MinVersion } } + +func (r *OffsetCommitResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTimeMs) * time.Millisecond +} diff --git a/offset_fetch_response.go b/offset_fetch_response.go index 19449220f..e2e34d7ff 100644 --- a/offset_fetch_response.go +++ b/offset_fetch_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type OffsetFetchResponseBlock struct { Offset int64 LeaderEpoch int32 @@ -255,6 +257,10 @@ func (r *OffsetFetchResponse) requiredVersion() KafkaVersion { } } +func (r *OffsetFetchResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTimeMs) * time.Millisecond +} + func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock { if r.Blocks == nil { return nil diff --git a/offset_response.go b/offset_response.go index ffe84664c..59d158d9d 100644 --- a/offset_response.go +++ b/offset_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type OffsetResponseBlock struct { Err KError Offsets []int64 // Version 0 @@ -176,6 +178,10 @@ func (r *OffsetResponse) requiredVersion() KafkaVersion { } } +func (r *OffsetResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTimeMs) * time.Millisecond +} + // testing API func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) { diff --git a/produce_response.go b/produce_response.go index edf978790..d7cda5d6b 100644 --- a/produce_response.go +++ b/produce_response.go @@ -179,6 +179,10 @@ func (r *ProduceResponse) requiredVersion() KafkaVersion { return MinVersion } +func (r *ProduceResponse) throttleTime() time.Duration { + return r.ThrottleTime +} + func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock { if r.Blocks == nil { return nil diff --git a/sync_group_response.go b/sync_group_response.go index 41b63b3d0..5c9cc1a6a 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -1,9 +1,11 @@ package sarama +import "time" + type SyncGroupResponse struct { // Version defines the protocol version to use for encode and decode Version int16 - // ThrottleTimeMs contains the duration in milliseconds for which the + // ThrottleTime contains the duration in milliseconds for which the // request was throttled due to a quota violation, or zero if the request // did not violate any quota. ThrottleTime int32 @@ -64,3 +66,7 @@ func (r *SyncGroupResponse) requiredVersion() KafkaVersion { } return V0_9_0_0 } + +func (r *SyncGroupResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTime) * time.Millisecond +} diff --git a/txn_offset_commit_response.go b/txn_offset_commit_response.go index 94d8029da..2767f8a68 100644 --- a/txn_offset_commit_response.go +++ b/txn_offset_commit_response.go @@ -85,3 +85,7 @@ func (a *TxnOffsetCommitResponse) headerVersion() int16 { func (a *TxnOffsetCommitResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } + +func (r *TxnOffsetCommitResponse) throttleTime() time.Duration { + return r.ThrottleTime +}