Skip to content

Commit

Permalink
Merge pull request #2520 from hindessm/mrh/admin-retry-logic
Browse files Browse the repository at this point in the history
Add some retry logic to more admin client functions
  • Loading branch information
dnwe authored Jul 25, 2023
2 parents 80899bf + aad8cf3 commit e07f521
Showing 1 changed file with 39 additions and 19 deletions.
58 changes: 39 additions & 19 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,27 +273,40 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
}

func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
controller, err := ca.Controller()
if err != nil {
return nil, err
}

request := NewMetadataRequest(ca.conf.Version, topics)
response, err := controller.GetMetadata(request)
var response *MetadataResponse
err = ca.retryOnError(isErrNoController, func() error {
controller, err := ca.Controller()
if err != nil {
return err
}
request := NewMetadataRequest(ca.conf.Version, topics)
response, err = controller.GetMetadata(request)
if isErrNoController(err) {
_, _ = ca.refreshController()
}
return err
})
if err != nil {
return nil, err
}
return response.Topics, nil
}

func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
controller, err := ca.Controller()
if err != nil {
return nil, int32(0), err
}
var response *MetadataResponse
err = ca.retryOnError(isErrNoController, func() error {
controller, err := ca.Controller()
if err != nil {
return err
}

request := NewMetadataRequest(ca.conf.Version, nil)
response, err := controller.GetMetadata(request)
request := NewMetadataRequest(ca.conf.Version, nil)
response, err = controller.GetMetadata(request)
if isErrNoController(err) {
_, _ = ca.refreshController()
}
return err
})
if err != nil {
return nil, int32(0), err
}
Expand Down Expand Up @@ -543,13 +556,20 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in

request.AddBlock(topic, partitions)

b, err := ca.Controller()
if err != nil {
return nil, err
}
_ = b.Open(ca.client.Config())
var rsp *ListPartitionReassignmentsResponse
err = ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}
_ = b.Open(ca.client.Config())

rsp, err := b.ListPartitionReassignments(request)
rsp, err = b.ListPartitionReassignments(request)
if isErrNoController(err) {
_, _ = ca.refreshController()
}
return err
})

if err == nil && rsp != nil {
return rsp.TopicStatus, nil
Expand Down

0 comments on commit e07f521

Please sign in to comment.