Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proto): implement and use MetadataRequest v7 #2388

Merged
merged 1 commit into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 3 additions & 20 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,7 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada
return nil, err
}

request := &MetadataRequest{
Topics: topics,
AllowAutoTopicCreation: false,
}

if ca.conf.Version.IsAtLeast(V1_0_0_0) {
request.Version = 5
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 4
}

request := NewMetadataRequest(ca.conf.Version, topics)
response, err := controller.GetMetadata(request)
if err != nil {
return nil, err
Expand All @@ -304,14 +294,7 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32
return nil, int32(0), err
}

request := &MetadataRequest{
Topics: []string{},
}

if ca.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 1
}

request := NewMetadataRequest(ca.conf.Version, nil)
response, err := controller.GetMetadata(request)
if err != nil {
return nil, int32(0), err
Expand Down Expand Up @@ -352,7 +335,7 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
}
_ = b.Open(ca.client.Config())

metadataReq := &MetadataRequest{}
metadataReq := NewMetadataRequest(ca.conf.Version, nil)
metadataResp, err := b.GetMetadata(metadataReq)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1711,7 +1711,7 @@ func TestTxnProduceBumpEpoch(t *testing.T) {
config.ApiVersionsRequest = false

metadataLeader := new(MetadataResponse)
metadataLeader.Version = 5
metadataLeader.Version = 7
metadataLeader.ControllerID = broker.brokerID
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
metadataLeader.AddTopic("test-topic", ErrNoError)
Expand Down
9 changes: 2 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,13 +989,8 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
DebugLogger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
}

req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
if client.conf.Version.IsAtLeast(V1_0_0_0) {
req.Version = 5
} else if client.conf.Version.IsAtLeast(V0_10_0_0) {
req.Version = 1
}

req := NewMetadataRequest(client.conf.Version, topics)
req.AllowAutoTopicCreation = allowAutoTopicCreation
t := atomic.LoadInt64(&client.updateMetaDataMs)
if !atomic.CompareAndSwapInt64(&client.updateMetaDataMs, t, time.Now().UnixNano()/int64(time.Millisecond)) {
return nil
Expand Down
43 changes: 33 additions & 10 deletions metadata_request.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
package sarama

type MetadataRequest struct {
Version int16
Topics []string
// Version defines the protocol version to use for encode and decode
Version int16
// Topics contains the topics to fetch metadata for.
Topics []string
// AllowAutoTopicCreation contains a If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so.
AllowAutoTopicCreation bool
}

func (r *MetadataRequest) encode(pe packetEncoder) error {
if r.Version < 0 || r.Version > 5 {
func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest {
m := &MetadataRequest{Topics: topics}
if version.IsAtLeast(V2_1_0_0) {
m.Version = 7
} else if version.IsAtLeast(V2_0_0_0) {
m.Version = 6
} else if version.IsAtLeast(V1_0_0_0) {
m.Version = 5
} else if version.IsAtLeast(V0_10_0_0) {
m.Version = 1
}
return m
}

func (r *MetadataRequest) encode(pe packetEncoder) (err error) {
if r.Version < 0 || r.Version > 12 {
return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
}
if r.Version == 0 || len(r.Topics) > 0 {
Expand All @@ -25,13 +42,15 @@ func (r *MetadataRequest) encode(pe packetEncoder) error {
} else {
pe.putInt32(-1)
}
if r.Version > 3 {

if r.Version >= 4 {
pe.putBool(r.AllowAutoTopicCreation)
}

return nil
}

func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
func (r *MetadataRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
size, err := pd.getInt32()
if err != nil {
Expand All @@ -47,13 +66,13 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
r.Topics[i] = topic
}
}
if r.Version > 3 {
autoCreation, err := pd.getBool()
if err != nil {

if r.Version >= 4 {
if r.AllowAutoTopicCreation, err = pd.getBool(); err != nil {
return err
}
r.AllowAutoTopicCreation = autoCreation
}

return nil
}

Expand All @@ -79,6 +98,10 @@ func (r *MetadataRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
case 5:
return V1_0_0_0
case 6:
return V2_0_0_0
case 7:
return V2_1_0_0
default:
return MinVersion
}
Expand Down
Loading