Skip to content

Commit

Permalink
style: eliminate redundancies and remove misleading annotion
Browse files Browse the repository at this point in the history
1. extract the same logic in Replicas, InSyncReplicas, OfflineReplicas. Improve reusability, maintainability, and consistency
2. remove unnecessary local variable and misleading annotion. see #2923

Signed-off-by: Trino <sujun.trinoooo@gmail.com>
  • Loading branch information
Trinoooo authored and dnwe committed Aug 7, 2024
1 parent 97a9f1d commit be9539d
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 49 deletions.
89 changes: 44 additions & 45 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"net"
Expand Down Expand Up @@ -413,57 +414,42 @@ func (client *client) WritablePartitions(topic string) ([]int32, error) {
return partitions, nil
}

func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) {
if client.Closed() {
return nil, ErrClosedClient
}

metadata := client.cachedMetadata(topic, partitionID)

if metadata == nil {
err := client.RefreshMetadata(topic)
if err != nil {
return nil, err
}
metadata = client.cachedMetadata(topic, partitionID)
}
type replicasType int

func (rt replicasType) String() string {
var str string
switch rt {
case replicasTypeAll:
str = "replicas"
case replicasTypeIsr:
str = "isr"
case replicasTypeOffline:
str = "offline"
default:
str = "unknown"
}
return str
}

if metadata == nil {
return nil, ErrUnknownTopicOrPartition
}
const (
replicasTypeAll replicasType = 1 // corresponds to metadata.Replicas
replicasTypeIsr replicasType = 2 // corresponds to metadata.Isr
replicasTypeOffline replicasType = 3 // corresponds to metadata.Offline
)

if errors.Is(metadata.Err, ErrReplicaNotAvailable) {
return dupInt32Slice(metadata.Replicas), metadata.Err
}
return dupInt32Slice(metadata.Replicas), nil
func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) {
return client.getReplicas(topic, partitionID, replicasTypeAll)
}

func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
if client.Closed() {
return nil, ErrClosedClient
}

metadata := client.cachedMetadata(topic, partitionID)

if metadata == nil {
err := client.RefreshMetadata(topic)
if err != nil {
return nil, err
}
metadata = client.cachedMetadata(topic, partitionID)
}

if metadata == nil {
return nil, ErrUnknownTopicOrPartition
}

if errors.Is(metadata.Err, ErrReplicaNotAvailable) {
return dupInt32Slice(metadata.Isr), metadata.Err
}
return dupInt32Slice(metadata.Isr), nil
return client.getReplicas(topic, partitionID, replicasTypeIsr)
}

func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) {
return client.getReplicas(topic, partitionID, replicasTypeOffline)
}

func (client *client) getReplicas(topic string, partitionID int32, rt replicasType) ([]int32, error) {
if client.Closed() {
return nil, ErrClosedClient
}
Expand All @@ -482,10 +468,23 @@ func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32,
return nil, ErrUnknownTopicOrPartition
}

var replicas []int32
switch rt {
case replicasTypeAll:
replicas = metadata.Replicas
case replicasTypeIsr:
replicas = metadata.Isr
case replicasTypeOffline:
replicas = metadata.OfflineReplicas
default:
// help found error during test
panic(fmt.Sprintf("unexpected replicas type: %v", rt))
}

if errors.Is(metadata.Err, ErrReplicaNotAvailable) {
return dupInt32Slice(metadata.OfflineReplicas), metadata.Err
return dupInt32Slice(replicas), metadata.Err
}
return dupInt32Slice(metadata.OfflineReplicas), nil
return dupInt32Slice(replicas), nil
}

func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
Expand Down
7 changes: 3 additions & 4 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ func withRecover(fn func()) {
}

func safeAsyncClose(b *Broker) {
tmp := b // local var prevents clobbering in goroutine
go withRecover(func() {
if connected, _ := tmp.Connected(); connected {
if err := tmp.Close(); err != nil {
Logger.Println("Error closing broker", tmp.ID(), ":", err)
if connected, _ := b.Connected(); connected {
if err := b.Close(); err != nil {
Logger.Println("Error closing broker", b.ID(), ":", err)
}
}
})
Expand Down

0 comments on commit be9539d

Please sign in to comment.