From c7ce32f533701c492ba8c19b02edab263eb2f79a Mon Sep 17 00:00:00 2001 From: Mark Hindess Date: Sun, 23 Jul 2023 15:30:02 +0100 Subject: [PATCH 1/3] fix: add retry logic to DescribeTopics Signed-off-by: Mark Hindess --- admin.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/admin.go b/admin.go index 03b5438be..c2a04ab8a 100644 --- a/admin.go +++ b/admin.go @@ -273,13 +273,19 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO } func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) { - controller, err := ca.Controller() - if err != nil { - return nil, err - } - - request := NewMetadataRequest(ca.conf.Version, topics) - response, err := controller.GetMetadata(request) + var response *MetadataResponse + err = ca.retryOnError(isErrNoController, func() error { + controller, err := ca.Controller() + if err != nil { + return err + } + request := NewMetadataRequest(ca.conf.Version, topics) + response, err = controller.GetMetadata(request) + if isErrNoController(err) { + _, _ = ca.refreshController() + } + return err + }) if err != nil { return nil, err } From 66ef5a94989f39f402038519eba144346928134e Mon Sep 17 00:00:00 2001 From: Mark Hindess Date: Sun, 23 Jul 2023 15:30:23 +0100 Subject: [PATCH 2/3] fix: add retry logic to DescribeCluster Signed-off-by: Mark Hindess --- admin.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/admin.go b/admin.go index c2a04ab8a..ee4cd4ca6 100644 --- a/admin.go +++ b/admin.go @@ -293,13 +293,20 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada } func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) { - controller, err := ca.Controller() - if err != nil { - return nil, int32(0), err - } + var response *MetadataResponse + err = ca.retryOnError(isErrNoController, func() error { + controller, err := ca.Controller() + if err != nil { + return err + } - request := NewMetadataRequest(ca.conf.Version, nil) - response, err := controller.GetMetadata(request) + request := NewMetadataRequest(ca.conf.Version, nil) + response, err = controller.GetMetadata(request) + if isErrNoController(err) { + _, _ = ca.refreshController() + } + return err + }) if err != nil { return nil, int32(0), err } From aad8cf3bd2527d03e716d5319b4d42eff74351c3 Mon Sep 17 00:00:00 2001 From: Mark Hindess Date: Sun, 23 Jul 2023 15:25:09 +0100 Subject: [PATCH 3/3] fix: add retry logic to ListPartitionReassignments Signed-off-by: Mark Hindess --- admin.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/admin.go b/admin.go index ee4cd4ca6..29eeca1c6 100644 --- a/admin.go +++ b/admin.go @@ -556,13 +556,20 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in request.AddBlock(topic, partitions) - b, err := ca.Controller() - if err != nil { - return nil, err - } - _ = b.Open(ca.client.Config()) + var rsp *ListPartitionReassignmentsResponse + err = ca.retryOnError(isErrNoController, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) - rsp, err := b.ListPartitionReassignments(request) + rsp, err = b.ListPartitionReassignments(request) + if isErrNoController(err) { + _, _ = ca.refreshController() + } + return err + }) if err == nil && rsp != nil { return rsp.TopicStatus, nil