diff --git a/admin.go b/admin.go index 03b5438be..29eeca1c6 100644 --- a/admin.go +++ b/admin.go @@ -273,13 +273,19 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO } func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) { - controller, err := ca.Controller() - if err != nil { - return nil, err - } - - request := NewMetadataRequest(ca.conf.Version, topics) - response, err := controller.GetMetadata(request) + var response *MetadataResponse + err = ca.retryOnError(isErrNoController, func() error { + controller, err := ca.Controller() + if err != nil { + return err + } + request := NewMetadataRequest(ca.conf.Version, topics) + response, err = controller.GetMetadata(request) + if isErrNoController(err) { + _, _ = ca.refreshController() + } + return err + }) if err != nil { return nil, err } @@ -287,13 +293,20 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada } func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) { - controller, err := ca.Controller() - if err != nil { - return nil, int32(0), err - } + var response *MetadataResponse + err = ca.retryOnError(isErrNoController, func() error { + controller, err := ca.Controller() + if err != nil { + return err + } - request := NewMetadataRequest(ca.conf.Version, nil) - response, err := controller.GetMetadata(request) + request := NewMetadataRequest(ca.conf.Version, nil) + response, err = controller.GetMetadata(request) + if isErrNoController(err) { + _, _ = ca.refreshController() + } + return err + }) if err != nil { return nil, int32(0), err } @@ -543,13 +556,20 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in request.AddBlock(topic, partitions) - b, err := ca.Controller() - if err != nil { - return nil, err - } - _ = b.Open(ca.client.Config()) + var rsp *ListPartitionReassignmentsResponse + err = ca.retryOnError(isErrNoController, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) - rsp, err := b.ListPartitionReassignments(request) + rsp, err = b.ListPartitionReassignments(request) + if isErrNoController(err) { + _, _ = ca.refreshController() + } + return err + }) if err == nil && rsp != nil { return rsp.TopicStatus, nil