Skip to content

Commit

Permalink
chore(proto): doc CreateTopics/JoinGroup fields
Browse files Browse the repository at this point in the history
Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
  • Loading branch information
dnwe committed Aug 29, 2023
1 parent 503ade3 commit a4eafb4
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 24 deletions.
21 changes: 17 additions & 4 deletions create_topics_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import (
)

type CreateTopicsRequest struct {
// Version defines the protocol version to use for encode and decode
Version int16

// TopicDetails contains the topics to create.
TopicDetails map[string]*TopicDetail
Timeout time.Duration
// Timeout contains how long to wait before timing out the request.
Timeout time.Duration
// ValidateOnly if true, check that the topics can be created as specified,
// but don't create anything.
ValidateOnly bool
}

Expand Down Expand Up @@ -103,10 +107,19 @@ func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
}

type TopicDetail struct {
NumPartitions int32
// NumPartitions contains the number of partitions to create in the topic, or
// -1 if we are either specifying a manual partition assignment or using the
// default partitions.
NumPartitions int32
// ReplicationFactor contains the number of replicas to create for each
// partition in the topic, or -1 if we are either specifying a manual
// partition assignment or using the default replication factor.
ReplicationFactor int16
// ReplicaAssignment contains the manual partition assignment, or the empty
// array if we are using automatic assignment.
ReplicaAssignment map[int32][]int32
ConfigEntries map[string]*string
// ConfigEntries contains the custom topic configurations to set.
ConfigEntries map[string]*string
}

func (t *TopicDetail) encode(pe packetEncoder) error {
Expand Down
8 changes: 6 additions & 2 deletions create_topics_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import (
)

type CreateTopicsResponse struct {
Version int16
// Version defines the protocol version to use for encode and decode
Version int16
// ThrottleTime contains the duration for which the request was throttled due
// to a quota violation, or zero if the request did not violate any quota.
ThrottleTime time.Duration
TopicErrors map[string]*TopicError
// TopicErrors contains a map of any errors for the topics we tried to create.
TopicErrors map[string]*TopicError
}

func (c *CreateTopicsResponse) encode(pe packetEncoder) error {
Expand Down
36 changes: 27 additions & 9 deletions join_group_request.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package sarama

type GroupProtocol struct {
Name string
// Name contains the protocol name.
Name string
// Metadata contains the protocol metadata.
Metadata []byte
}

Expand All @@ -25,14 +27,30 @@ func (p *GroupProtocol) encode(pe packetEncoder) (err error) {
}

type JoinGroupRequest struct {
Version int16
GroupId string
SessionTimeout int32
RebalanceTimeout int32
MemberId string
GroupInstanceId *string
ProtocolType string
GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols
// Version defines the protocol version to use for encode and decode
Version int16
// GroupId contains the group identifier.
GroupId string
// SessionTimeout specifies that the coordinator should consider the consumer
// dead if it receives no heartbeat after this timeout in milliseconds.
SessionTimeout int32
// RebalanceTimeout contains the maximum time in milliseconds that the
// coordinator will wait for each member to rejoin when rebalancing the
// group.
RebalanceTimeout int32
// MemberId contains the member id assigned by the group coordinator.
MemberId string
// GroupInstanceId contains the unique identifier of the consumer instance
// provided by end user.
GroupInstanceId *string
// ProtocolType contains the unique name the for class of protocols
// implemented by the group we want to join.
ProtocolType string
// GroupProtocols contains the list of protocols that the member supports.
// deprecated; use OrderedGroupProtocols
GroupProtocols map[string][]byte
// OrderedGroupProtocols contains an ordered list of protocols that the member
// supports.
OrderedGroupProtocols []*GroupProtocol
}

Expand Down
31 changes: 22 additions & 9 deletions join_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,33 @@ package sarama
import "time"

type JoinGroupResponse struct {
Version int16
ThrottleTime int32
Err KError
GenerationId int32
// Version defines the protocol version to use for encode and decode
Version int16
// ThrottleTime contains the duration for which the request was throttled due
// to a quota violation, or zero if the request did not violate any quota.
ThrottleTime int32
// Err contains the error code, or 0 if there was no error.
Err KError
// GenerationId contains the generation ID of the group.
GenerationId int32
// GroupProtocol contains the group protocol selected by the coordinator.
GroupProtocol string
LeaderId string
MemberId string
Members []GroupMember
// LeaderId contains the leader of the group.
LeaderId string
// MemberId contains the member ID assigned by the group coordinator.
MemberId string
// Members contains the per-group-member information.
Members []GroupMember
}

type GroupMember struct {
MemberId string
// MemberId contains the group member ID.
MemberId string
// GroupInstanceId contains the unique identifier of the consumer instance
// provided by end user.
GroupInstanceId *string
Metadata []byte
// Metadata contains the group member metadata.
Metadata []byte
}

func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {
Expand Down

0 comments on commit a4eafb4

Please sign in to comment.