Skip to content

Commit

Permalink
Fix the channle consumed bug
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jan 30, 2024
1 parent 335d272 commit ade43b6
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 31 deletions.
7 changes: 6 additions & 1 deletion pkg/utils/etcdutil/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ func (checker *healthChecker) pickEps(probeCh <-chan healthProbe) []string {
if count == 0 {
return pickedEps
}
// Consume the `probeCh` to build a reusable slice.
probes := make([]healthProbe, 0, count)
for probe := range probeCh {
probes = append(probes, probe)
}
// Take the default value as an example, if we have 3 endpoints with latency like:
// - A: 175ms
// - B: 50ms
Expand All @@ -244,7 +249,7 @@ func (checker *healthChecker) pickEps(probeCh <-chan healthProbe) []string {
factor := int(DefaultRequestTimeout / DefaultSlowRequestTime)
for i := 0; i < factor; i++ {
minLatency, maxLatency := DefaultSlowRequestTime*time.Duration(i), DefaultSlowRequestTime*time.Duration(i+1)
for probe := range probeCh {
for _, probe := range probes {
if minLatency <= probe.took && probe.took < maxLatency {
log.Debug("pick healthy etcd endpoint within acceptable latency range",
zap.Duration("min-latency", minLatency),
Expand Down
183 changes: 153 additions & 30 deletions pkg/utils/etcdutil/health_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,45 @@ import (
"github.com/stretchr/testify/require"
)

type testCase struct {
healthProbes []healthProbe
expectedEvictedEps map[string]int
expectedPickedEps []string
}

func check(re *require.Assertions, testCases []*testCase) {
checker := &healthChecker{}
lastEps := []string{}
for idx, tc := range testCases {
// Send the health probes to the channel.
probeCh := make(chan healthProbe, len(tc.healthProbes))
for _, probe := range tc.healthProbes {
probeCh <- probe
}
close(probeCh)
// Pick and filter the endpoints.
pickedEps := checker.pickEps(probeCh)
checker.updateEvictedEps(lastEps, pickedEps)
pickedEps = checker.filterEps(pickedEps)
// Check the evicted states after finishing picking.
count := 0
checker.evictedEps.Range(func(key, value interface{}) bool {
count++
ep := key.(string)
times := value.(int)
re.Equal(tc.expectedEvictedEps[ep], times, "case %d ep %s", idx, ep)
return true
})
re.Len(tc.expectedEvictedEps, count, "case %d", idx)
re.Equal(tc.expectedPickedEps, pickedEps, "case %d", idx)
lastEps = pickedEps
}
}

// Test the endpoint picking and evicting logic.
func TestPickEps(t *testing.T) {
re := require.New(t)
testCases := []struct {
healthProbes []healthProbe
expectedEvictedEps map[string]int
expectedPickedEps []string
}{
testCases := []*testCase{
// {} -> {A, B}
{
[]healthProbe{
Expand Down Expand Up @@ -173,30 +204,122 @@ func TestPickEps(t *testing.T) {
[]string{},
},
}
checker := &healthChecker{}
lastEps := []string{}
for idx, tc := range testCases {
// Send the health probes to the channel.
probeCh := make(chan healthProbe, len(tc.healthProbes))
for _, probe := range tc.healthProbes {
probeCh <- probe
}
close(probeCh)
// Pick and filter the endpoints.
pickedEps := checker.pickEps(probeCh)
checker.updateEvictedEps(lastEps, pickedEps)
pickedEps = checker.filterEps(pickedEps)
// Check the states after finishing picking.
count := 0
checker.evictedEps.Range(func(key, value interface{}) bool {
count++
ep := key.(string)
times := value.(int)
re.Equal(tc.expectedEvictedEps[ep], times, "case %d ep %s", idx, ep)
return true
})
re.Len(tc.expectedEvictedEps, count, "case %d", idx)
re.Equal(tc.expectedPickedEps, pickedEps, "case %d", idx)
lastEps = pickedEps
check(re, testCases)
}

func TestLatencyPick(t *testing.T) {
re := require.New(t)
testCases := []*testCase{
// {} -> {A, B}
{
[]healthProbe{
{
ep: "A",
took: time.Millisecond,
},
{
ep: "B",
took: time.Millisecond,
},
},
map[string]int{},
[]string{"A", "B"},
},
// {A, B} -> {A, B, C}
{
[]healthProbe{
{
ep: "A",
took: time.Millisecond,
},
{
ep: "B",
took: time.Millisecond,
},
{
ep: "C",
took: time.Second,
},
},
map[string]int{},
[]string{"A", "B"},
},
// {A, B} -> {A, B, C}
{
[]healthProbe{
{
ep: "A",
took: time.Second,
},
{
ep: "B",
took: time.Second,
},
{
ep: "C",
took: 2 * time.Second,
},
},
map[string]int{},
[]string{"A", "B"},
},
// {A, B} -> {A, B, C}
{
[]healthProbe{
{
ep: "A",
took: time.Second,
},
{
ep: "B",
took: 2 * time.Second,
},
{
ep: "C",
took: 3 * time.Second,
},
},
map[string]int{"B": 0},
[]string{"A"},
},
// {A} -> {A, B, C}
{
[]healthProbe{
{
ep: "A",
took: time.Second,
},
{
ep: "B",
took: time.Second,
},
{
ep: "C",
took: time.Millisecond,
},
},
map[string]int{"A": 0, "B": 0},
[]string{"C"},
},
// {C} -> {A, B, C}
{
[]healthProbe{
{
ep: "A",
took: time.Millisecond,
},
{
ep: "B",
took: time.Millisecond,
},
{
ep: "C",
took: time.Second,
},
},
map[string]int{"A": 1, "B": 1, "C": 0},
[]string{},
},
}
check(re, testCases)
}

0 comments on commit ade43b6

Please sign in to comment.