Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vttablet/tabletmanager: Update the tablet_externally_reparented_timestamp every time the TabletExternallyReparented RPC is called. #3009

Merged
merged 1 commit into from
Jul 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions go/vt/proto/query/query.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions go/vt/vtgate/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"fmt"
"strings"
"sync"
"time"

log "github.com/golang/glog"
"golang.org/x/net/context"
Expand Down Expand Up @@ -69,12 +70,15 @@ const (
// There should be exactly one instance of this buffer. For each failover, an
// instance of "ShardBuffer" will be created.
type Buffer struct {
// Immutable configuration fields (parsed from command line flags).
// Immutable configuration fields.
// Except for "now", they are parsed from command line flags.
// keyspaces has the same purpose as "shards" but applies to a whole keyspace.
keyspaces map[string]bool
// shards is a set of keyspace/shard entries to which buffering is limited.
// If empty (and *enabled==true), buffering is enabled for all shards.
shards map[string]bool
// now returns the current time. Overriden in tests.
now func() time.Time

// bufferSizeSema limits how many requests can be buffered
// ("-buffer_size") and is shared by all shardBuffer instances.
Expand All @@ -97,6 +101,10 @@ type Buffer struct {

// New creates a new Buffer object.
func New() *Buffer {
return newWithNow(time.Now)
}

func newWithNow(now func() time.Time) *Buffer {
if err := verifyFlags(); err != nil {
log.Fatalf("Invalid buffer configuration: %v", err)
}
Expand Down Expand Up @@ -139,6 +147,7 @@ func New() *Buffer {
return &Buffer{
keyspaces: keyspaces,
shards: shards,
now: now,
bufferSizeSema: sync2.NewSemaphore(*size, 0),
buffers: make(map[string]*shardBuffer),
}
Expand Down Expand Up @@ -224,7 +233,7 @@ func (b *Buffer) StatsUpdate(ts *discovery.TabletStats) {
// Buffer is shut down. Ignore all calls.
return
}
sb.recordExternallyReparentedTimestamp(timestamp)
sb.recordExternallyReparentedTimestamp(timestamp, ts.Tablet.Alias)
}

// causedByFailover returns true if "err" was supposedly caused by a failover.
Expand Down Expand Up @@ -279,7 +288,7 @@ func (b *Buffer) getOrCreateBuffer(keyspace, shard string) *shardBuffer {
// Look it up again because it could have been created in the meantime.
sb, ok = b.buffers[key]
if !ok {
sb = newShardBuffer(b.mode(keyspace, shard), keyspace, shard, b.bufferSizeSema)
sb = newShardBuffer(b.mode(keyspace, shard), keyspace, shard, b.now, b.bufferSizeSema)
b.buffers[key] = sb
}
return sb
Expand Down
127 changes: 115 additions & 12 deletions go/vt/vtgate/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ var (

statsKeyJoinedLastReparentTooRecent = statsKeyJoined + "." + string(skippedLastReparentTooRecent)
statsKeyJoinedLastFailoverTooRecent = statsKeyJoined + "." + string(skippedLastFailoverTooRecent)

oldMaster = &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{Cell: "cell1", Uid: 100},
Keyspace: keyspace,
Shard: shard,
Type: topodatapb.TabletType_MASTER,
PortMap: map[string]int32{"vt": int32(100)},
}
newMaster = &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{Cell: "cell1", Uid: 101},
Keyspace: keyspace,
Shard: shard,
Type: topodatapb.TabletType_MASTER,
PortMap: map[string]int32{"vt": int32(101)},
}
)

func TestBuffer(t *testing.T) {
Expand All @@ -70,7 +85,19 @@ func TestBuffer(t *testing.T) {
defer resetFlagsForTesting()

// Create the buffer.
b := New()
now := time.Now()
b := newWithNow(func() time.Time { return now })

// Simulate that the current master reports its ExternallyReparentedTimestamp.
// vtgate sees this at startup. Additional periodic updates will be sent out
// after this. If the TabletExternallyReparented RPC is called regularly by
// an external failover tool, the timestamp will be increased (even though
// the master did not change.)
b.StatsUpdate(&discovery.TabletStats{
Tablet: oldMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: now.Unix(),
})

// First request with failover error starts buffering.
stopped := issueRequest(context.Background(), t, b, failoverErr)
Expand All @@ -95,9 +122,11 @@ func TestBuffer(t *testing.T) {
}

// Mimic the failover end.
now = now.Add(1 * time.Second)
b.StatsUpdate(&discovery.TabletStats{
Tablet: newMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: 1, // Use any value > 0.
TabletExternallyReparentedTimestamp: now.Unix(),
})

// Check that the drain is successful.
Expand Down Expand Up @@ -140,8 +169,8 @@ func TestBuffer(t *testing.T) {
t.Fatalf("skipped request was not tracked: got = %v, want = %v", got, want)
}

// Second failover is buffered if we reduce the limit.
flag.Set("buffer_min_time_between_failovers", "0s")
// Second failover is buffered if enough time has passed.
now = now.Add(*minTimeBetweenFailovers)
stopped4 := issueRequest(context.Background(), t, b, failoverErr)
if err := waitForRequestsInFlight(b, 1); err != nil {
t.Fatal(err)
Expand All @@ -156,8 +185,9 @@ func TestBuffer(t *testing.T) {
}
// Stop buffering.
b.StatsUpdate(&discovery.TabletStats{
Tablet: oldMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: 2, // Must be >1.
TabletExternallyReparentedTimestamp: now.Unix(),
})
if err := <-stopped4; err != nil {
t.Fatalf("request should have been buffered and not returned an error: %v", err)
Expand Down Expand Up @@ -292,6 +322,7 @@ func TestDryRun(t *testing.T) {

// End of failover is tracked as well.
b.StatsUpdate(&discovery.TabletStats{
Tablet: newMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: 1, // Use any value > 0.
})
Expand Down Expand Up @@ -326,30 +357,35 @@ func TestPassthrough(t *testing.T) {
}
}

// TestPassThroughLastReparentTooRecent tests that buffering is skipped if
// we see the failover end faster than the beginning.
func TestPassThroughLastReparentTooRecent(t *testing.T) {
// TestLastReparentTooRecent_BufferingSkipped tests that buffering is skipped if
// we see the reparent (end) *before* any request failures due to it.
// We must not start buffering because we already observed the trigger for
// stopping buffering (the reparent) and may not see it again.
func TestLastReparentTooRecent_BufferingSkipped(t *testing.T) {
resetVariables()

flag.Set("enable_buffer", "true")
// Enable the buffer (no explicit whitelist i.e. it applies to everything).
defer resetFlagsForTesting()
b := New()
now := time.Now()
b := newWithNow(func() time.Time { return now })

// Simulate that the old master notified us about its reparented timestamp
// very recently (time.Now()).
// vtgate should see this immediately after the start.
nowSeconds := time.Now().Unix()
b.StatsUpdate(&discovery.TabletStats{
Tablet: oldMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: nowSeconds,
TabletExternallyReparentedTimestamp: now.Unix(),
})

// Failover to new master. Its end is detected faster than the beginning.
// Do not start buffering.
now = now.Add(1 * time.Second)
b.StatsUpdate(&discovery.TabletStats{
Tablet: newMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: nowSeconds + 1,
TabletExternallyReparentedTimestamp: now.Unix(),
})

if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil {
Expand All @@ -366,6 +402,68 @@ func TestPassThroughLastReparentTooRecent(t *testing.T) {
}
}

// TestLastReparentTooRecent_Buffering explicitly tests that the "too recent"
// skipping of the buffering does NOT get triggered because enough time has
// elapsed since the last seen reparent.
func TestLastReparentTooRecent_Buffering(t *testing.T) {
resetVariables()

flag.Set("enable_buffer", "true")
// Enable the buffer (no explicit whitelist i.e. it applies to everything).
defer resetFlagsForTesting()
now := time.Now()
b := newWithNow(func() time.Time { return now })

// Simulate that the old master notified us about its reparented timestamp
// very recently (time.Now()).
// vtgate should see this immediately after the start.
b.StatsUpdate(&discovery.TabletStats{
Tablet: oldMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: now.Unix(),
})

// Failover to new master. Do not issue any requests before or after i.e.
// there was 0 QPS traffic and no buffering was started.
now = now.Add(1 * time.Second)
b.StatsUpdate(&discovery.TabletStats{
Tablet: newMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: now.Unix(),
})

// After we're past the --buffer_min_time_between_failovers threshold, go
// through a failover with non-zero QPS.
now = now.Add(*minTimeBetweenFailovers)
// We're seeing errors first.
stopped := issueRequest(context.Background(), t, b, failoverErr)
if err := waitForRequestsInFlight(b, 1); err != nil {
t.Fatal(err)
}
// And then the failover end.
b.StatsUpdate(&discovery.TabletStats{
Tablet: newMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: now.Unix(),
})

// Check that the drain is successful.
if err := <-stopped; err != nil {
t.Fatalf("request should have been buffered and not returned an error: %v", err)
}
// Drain will reset the state to "idle" eventually.
if err := waitForState(b, stateIdle); err != nil {
t.Fatal(err)
}

if got, want := requestsSkipped.Counts()[statsKeyJoinedLastReparentTooRecent], int64(0); got != want {
t.Fatalf("request should not have been skipped: got = %v, want = %v", got, want)
}
if got, want := requestsBuffered.Counts()[statsKeyJoined], int64(1); got != want {
t.Fatalf("request should have been tracked as buffered: got = %v, want = %v", got, want)
}
}

// TestPassthroughDuringDrain tests the behavior of requests while the buffer is
// in the drain phase: They should not be buffered and passed through instead.
func TestPassthroughDuringDrain(t *testing.T) {
Expand All @@ -383,6 +481,7 @@ func TestPassthroughDuringDrain(t *testing.T) {

// Stop buffering and trigger drain.
b.StatsUpdate(&discovery.TabletStats{
Tablet: newMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: 1, // Use any value > 0.
})
Expand Down Expand Up @@ -497,6 +596,7 @@ func testRequestCanceled(t *testing.T, explicitEnd bool) {

if explicitEnd {
b.StatsUpdate(&discovery.TabletStats{
Tablet: newMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: 1, // Use any value > 0.
})
Expand All @@ -515,6 +615,7 @@ func testRequestCanceled(t *testing.T, explicitEnd bool) {
// shortly after. In that case, the buffer should ignore it.
if !explicitEnd {
b.StatsUpdate(&discovery.TabletStats{
Tablet: newMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: 1, // Use any value > 0.
})
Expand Down Expand Up @@ -560,6 +661,7 @@ func TestEviction(t *testing.T) {

// End of failover. Stop buffering.
b.StatsUpdate(&discovery.TabletStats{
Tablet: newMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: 1, // Use any value > 0.
})
Expand Down Expand Up @@ -642,6 +744,7 @@ func TestEvictionNotPossible(t *testing.T) {

// End of failover. Stop buffering.
b.StatsUpdate(&discovery.TabletStats{
Tablet: newMaster,
Target: &querypb.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_MASTER},
TabletExternallyReparentedTimestamp: 1, // Use any value > 0.
})
Expand Down
Loading