Skip to content

Commit

Permalink
Correct health streamer behavior
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jul 22, 2023
1 parent d5499e4 commit ca220b0
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 11 deletions.
7 changes: 4 additions & 3 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package discovery

import (
"fmt"
"io"
"math/rand"
"sort"
"strings"
Expand Down Expand Up @@ -431,13 +432,13 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
// Ensure that the tablet is healthy and serving.
if err := conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
if shr.RealtimeStats.HealthError == "" && shr.Serving {
return nil
return io.EOF // End the stream
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not serving")
}); err == nil {
_ = conn.Close(ctx)
}); err == nil || err == io.EOF {
tablets = append(tablets, tabletInfo)
}
_ = conn.Close(ctx)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package vreplication
import (
"context"
"fmt"
"io"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -336,13 +337,13 @@ func (ct *controller) sourceTabletIsUnhealthy() bool {
// Ensure that the tablet is healthy and serving.
if err := conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
if shr.RealtimeStats.HealthError == "" && shr.Serving {
return nil
return io.EOF // End the stream
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not serving")
}); err == nil {
_ = conn.Close(ctx)
}); err == nil || err == io.EOF {
return true
}
_ = conn.Close(ctx)
}
return false
}
Expand Down
1 change: 0 additions & 1 deletion go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.Str
}
return err
}
return nil
}
}
}
Expand Down
1 change: 0 additions & 1 deletion go/vt/wrangler/fake_tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ func (ft *fakeTablet) StreamHealth(ctx context.Context, callback func(*querypb.S
Target: &querypb.Target{
Keyspace: ft.Tablet.Keyspace,
Shard: ft.Tablet.Shard,
Cell: ft.Tablet.Alias.Cell,
TabletType: ft.Tablet.Type,
},
RealtimeStats: &querypb.RealtimeStats{},
Expand Down
6 changes: 3 additions & 3 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
}
return nil, nil
})
tabletconntest.SetProtocol("go.vt.traffic_switcher_env_test", dialerName)
tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName)

vs := &vschemapb.Keyspace{
Sharded: true,
Expand Down Expand Up @@ -336,7 +336,7 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar
}
return nil, nil
})
tabletconntest.SetProtocol("go.vt.traffic_switcher_env_test", dialerName)
tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName)

vs := &vschemapb.Keyspace{
Sharded: true,
Expand Down Expand Up @@ -494,7 +494,7 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
}
return nil, nil
})
tabletconntest.SetProtocol("go.vt.traffic_switcher_env_test", dialerName)
tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName)

vs := &vschemapb.Keyspace{
Sharded: true,
Expand Down

0 comments on commit ca220b0

Please sign in to comment.