Skip to content

Commit

Permalink
Merge branch 'slack-15.0' into v16-gprc-protobuf-slack-15.0
Browse files Browse the repository at this point in the history
  • Loading branch information
timvaillancourt authored Sep 12, 2024
2 parents b1b3aeb + bd70d86 commit 4f3bd5a
Show file tree
Hide file tree
Showing 32 changed files with 936 additions and 623 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unit_race.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.unit_tests == 'true'
uses: nick-fields/retry@v2
with:
timeout_minutes: 30
timeout_minutes: 45
max_attempts: 3
retry_on: error
command: |
Expand Down
Binary file removed go/cmd/vtgateproxy/vtgateproxy
Binary file not shown.
6 changes: 3 additions & 3 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
Usage of vtgate:
--allowed_tablet_types strings Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types.
--alsologtostderr log to standard error as well as files
--balancer_enabled Whether to enable the tablet balancer to evenly spread query load
--balancer_keyspaces strings When in balanced mode, a comma-separated list of keyspaces for which to use the balancer (optional)
--balancer_vtgate_cells strings When in balanced mode, a comma-separated list of cells that contain vtgates (required)
--balancer-keyspaces strings When in balanced mode, a comma-separated list of keyspaces for which to use the balancer (optional)
--balancer-vtgate-cells strings When in balanced mode, a comma-separated list of cells that contain vtgates (required)
--buffer_drain_concurrency int Maximum number of requests retried simultaneously. More concurrency will increase the load on the PRIMARY vttablet when draining the buffer. (default 1)
--buffer_implementation string Allowed values: healthcheck (legacy implementation), keyspace_events (default) (default "keyspace_events")
--buffer_keyspace_shards string If not empty, limit buffering to these entries (comma separated). Entry format: keyspace or keyspace/shard. Requires --enable_buffer=true.
Expand All @@ -23,6 +22,7 @@ Usage of vtgate:
--discovery_high_replication_lag_minimum_serving duration Threshold above which replication lag is considered too high when applying the min_number_serving_vttablets flag. (default 2h0m0s)
--discovery_low_replication_lag duration Threshold below which replication lag is considered low enough to be healthy. (default 30s)
--emit_stats If set, emit stats to push-based monitoring and stats backends
--enable-balancer Enable the tablet balancer to evenly spread query load for a given tablet type
--enable-partial-keyspace-migration (Experimental) Follow shard routing rules: enable only while migrating a keyspace shard by shard. See documentation on Partial MoveTables for more. (default false)
--enable_buffer Enable buffering (stalling) of primary traffic during failovers.
--enable_buffer_dry_run Detect and log failover events, but do not actually buffer requests.
Expand Down
782 changes: 391 additions & 391 deletions go/flags/endtoend/vttablet.txt

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions go/flagutil/flagutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,24 @@ func (v *DurationOrIntVar) Type() string { return "duration" }

// Value returns the underlying Duration value passed to the flag.
func (v *DurationOrIntVar) Value() time.Duration { return v.val }

type DurationOrSecondsFloatFlag float64

func (set *DurationOrSecondsFloatFlag) Set(s string) error {
if dur, err := time.ParseDuration(s); err == nil {
*set = DurationOrSecondsFloatFlag(dur.Seconds())
} else {
f, err := strconv.ParseFloat(s, 64)
if err != nil {
return err
}
*set = DurationOrSecondsFloatFlag(f)
}
return nil
}

func (set *DurationOrSecondsFloatFlag) String() string {
return strconv.FormatFloat(float64(*set), 'f', -1, 64)
}

func (set *DurationOrSecondsFloatFlag) Type() string { return "DurationOrSecondsFloat" }
46 changes: 46 additions & 0 deletions go/flagutil/flagutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,49 @@ func TestDurationOrIntVar(t *testing.T) {
assert.Equal(t, tt.want, flag.Value())
}
}

func TestDurationOrSecondsFloatFlag(t *testing.T) {
testCases := []struct {
Set string
Expected float64
ExpectedErr string
}{
{
Set: "1",
Expected: 1,
},
{
Set: "0.5",
Expected: 0.5,
},
{
Set: "1800",
Expected: 1800,
},
{
Set: "50ms",
Expected: 0.05,
},
{
Set: "42m",
Expected: 2520,
},
{
Set: "wont-parse",
ExpectedErr: `strconv.ParseFloat: parsing "wont-parse": invalid syntax`,
},
}

for _, testCase := range testCases {
testCase := testCase
t.Run(testCase.Set, func(t *testing.T) {
t.Parallel()
var f DurationOrSecondsFloatFlag
err := f.Set(testCase.Set)
if testCase.ExpectedErr != "" {
assert.ErrorContains(t, err, testCase.ExpectedErr)
}
assert.Equal(t, testCase.Expected, float64(f))
})
}
}
4 changes: 4 additions & 0 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,11 +613,15 @@ func (db *DB) GetQueryCalledNum(query string) int {

// QueryLog returns the query log in a semicomma separated string
func (db *DB) QueryLog() string {
db.mu.Lock()
defer db.mu.Unlock()
return strings.Join(db.querylog, ";")
}

// ResetQueryLog resets the query log
func (db *DB) ResetQueryLog() {
db.mu.Lock()
defer db.mu.Unlock()
db.querylog = nil
}

Expand Down
52 changes: 32 additions & 20 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"html/template"
Expand Down Expand Up @@ -98,6 +99,9 @@ var (

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

// errKeyspacesToWatchAndTabletFilters is an error for cases where incompatible filters are defined.
errKeyspacesToWatchAndTabletFilters = errors.New("only one of --keyspaces_to_watch and --tablet_filters may be specified at a time")
)

// See the documentation for NewHealthCheck below for an explanation of these parameters.
Expand Down Expand Up @@ -296,6 +300,27 @@ type HealthCheckImpl struct {
healthCheckDialSem *semaphore.Weighted
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) {
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
return nil, errKeyspacesToWatchAndTabletFilters
}

fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
return nil, fmt.Errorf("failed to parse tablet_filters value %q: %v", strings.Join(tabletFilters, ","), err)
}
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
return filters, nil
}

// NewHealthCheck creates a new HealthCheck object.
// Parameters:
// retryDelay.
Expand All @@ -317,10 +342,14 @@ type HealthCheckImpl struct {
//
// The localCell for this healthcheck
//
// callback.
// cellsToWatch.
//
// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl {
// Is a list of cells to watch for tablets.
//
// filters.
//
// Is one or more filters to apply when determining what tablets we want to stream healthchecks from.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl {
log.Infof("loading tablets for cells: %v", cellsToWatch)

hc := &HealthCheckImpl{
Expand All @@ -342,27 +371,10 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
}

for _, c := range cells {
var filters TabletFilters
log.Infof("Setting up healthcheck for cell: %v", c)
if c == "" {
continue
}
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}

fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
}

Expand Down
79 changes: 75 additions & 4 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,77 @@ func init() {
refreshInterval = time.Minute
}

func TestNewVTGateHealthCheckFilters(t *testing.T) {
defer func() {
KeyspacesToWatch = nil
tabletFilters = nil
tabletFilterTags = nil
}()

testCases := []struct {
name string
keyspacesToWatch []string
tabletFilters []string
tabletFilterTags map[string]string
expectedError string
expectedFilterTypes []any
}{
{
name: "noFilters",
},
{
name: "tabletFilters",
tabletFilters: []string{"ks1|-80"},
expectedFilterTypes: []any{&FilterByShard{}},
},
{
name: "keyspacesToWatch",
keyspacesToWatch: []string{"ks1"},
expectedFilterTypes: []any{&FilterByKeyspace{}},
},
{
name: "tabletFiltersAndTags",
tabletFilters: []string{"ks1|-80"},
tabletFilterTags: map[string]string{"test": "true"},
expectedFilterTypes: []any{&FilterByShard{}, &FilterByTabletTags{}},
},
{
name: "keyspacesToWatchAndTags",
tabletFilterTags: map[string]string{"test": "true"},
keyspacesToWatch: []string{"ks1"},
expectedFilterTypes: []any{&FilterByKeyspace{}, &FilterByTabletTags{}},
},
{
name: "failKeyspacesToWatchAndFilters",
tabletFilters: []string{"ks1|-80"},
keyspacesToWatch: []string{"ks1"},
expectedError: errKeyspacesToWatchAndTabletFilters.Error(),
},
{
name: "failInvalidTabletFilters",
tabletFilters: []string{"shouldfail!@#!"},
expectedError: "failed to parse tablet_filters value \"shouldfail!@#!\": invalid FilterByShard parameter: shouldfail!@#!",
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
KeyspacesToWatch = testCase.keyspacesToWatch
tabletFilters = testCase.tabletFilters
tabletFilterTags = testCase.tabletFilterTags

filters, err := NewVTGateHealthCheckFilters()
if testCase.expectedError != "" {
assert.EqualError(t, err, testCase.expectedError)
}
assert.Len(t, filters, len(testCase.expectedFilterTypes))
for i, filter := range filters {
assert.IsType(t, testCase.expectedFilterTypes[i], filter)
}
})
}
}

func TestHealthCheck(t *testing.T) {
// reset error counters
hcErrorCounters.ResetAll()
Expand Down Expand Up @@ -943,7 +1014,7 @@ func TestGetHealthyTablets(t *testing.T) {

func TestPrimaryInOtherCell(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

// add a tablet as primary in different cell
Expand Down Expand Up @@ -1000,7 +1071,7 @@ func TestPrimaryInOtherCell(t *testing.T) {

func TestReplicaInOtherCell(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

// add a tablet as replica
Expand Down Expand Up @@ -1102,7 +1173,7 @@ func TestReplicaInOtherCell(t *testing.T) {

func TestCellAliases(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

cellsAlias := &topodatapb.CellsAlias{
Expand Down Expand Up @@ -1248,7 +1319,7 @@ func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservic
}

func createTestHc(ts *topo.Server) *HealthCheckImpl {
return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "")
return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "", nil)
}

type fakeConn struct {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/discovery/keyspace_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) {
factory.AddCell(cell)
ts := faketopo.NewFakeTopoServer(factory)
ts2 := &fakeTopoServer{}
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil)
defer hc.Close()
kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell)
kss := &keyspaceState{
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestKeyspaceEventTypes(t *testing.T) {
factory.AddCell(cell)
ts := faketopo.NewFakeTopoServer(factory)
ts2 := &fakeTopoServer{}
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil)
defer hc.Close()
kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell)

Expand Down
6 changes: 3 additions & 3 deletions go/vt/throttler/demo/throttler_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type replica struct {

// throttler is used to enforce the maximum rate at which replica applies
// transactions. It must not be confused with the client's throttler.
throttler *throttler.Throttler
throttler throttler.Throttler
lastHealthUpdate time.Time
lagUpdateInterval time.Duration

Expand Down Expand Up @@ -224,7 +224,7 @@ type client struct {
primary *primary

healthCheck discovery.HealthCheck
throttler *throttler.Throttler
throttler throttler.Throttler

stopChan chan struct{}
wg sync.WaitGroup
Expand All @@ -237,7 +237,7 @@ func newClient(primary *primary, replica *replica, ts *topo.Server) *client {
log.Fatal(err)
}

healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "")
healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "", nil)
c := &client{
primary: primary,
healthCheck: healthCheck,
Expand Down
Loading

0 comments on commit 4f3bd5a

Please sign in to comment.