Skip to content

Commit

Permalink
Improve error handling in vstream_manager
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jan 16, 2025
1 parent 71ccd6d commit 50513d2
Showing 1 changed file with 49 additions and 35 deletions.
84 changes: 49 additions & 35 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func(events []*binlogdatapb.VEvent) error) error {
vgtid, filter, flags, err := vsm.resolveParams(ctx, tabletType, vgtid, filter, flags)
if err != nil {
return err
return vterrors.Wrap(err, "failed to resolve vstream parameters")
}
ts, err := vsm.toposerv.GetTopoServer()
if err != nil {
return err
return vterrors.Wrap(err, "failed to get topology server")
}
if ts == nil {
log.Errorf("unable to get topo server in VStream()")
return fmt.Errorf("unable to get topo server")
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "unable to get topoology server")
}
vs := &vstream{
vgtid: vgtid,
Expand Down Expand Up @@ -215,7 +215,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
flags = &vtgatepb.VStreamFlags{}
}
if vgtid == nil || len(vgtid.ShardGtids) == 0 {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position")
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position in ShardGtids")
}
// To fetch from all keyspaces, the input must contain a single ShardGtid
// that has an empty keyspace, and the Gtid must be "current".
Expand All @@ -228,7 +228,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
newvgtid := &binlogdatapb.VGtid{}
keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, vterrors.Wrapf(err, "failed to get keyspace names in cell %s", vsm.cell)
}

if isEmpty {
Expand All @@ -244,7 +244,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
} else {
re, err := regexp.Compile(strings.Trim(inputKeyspace, "/"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, vterrors.Wrapf(err, "failed to compile regexp using %s", inputKeyspace)
}
for _, keyspace := range keyspaces {
if re.MatchString(keyspace) {
Expand All @@ -262,12 +262,13 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
for _, sgtid := range vgtid.ShardGtids {
if sgtid.Shard == "" {
if sgtid.Gtid != "current" && sgtid.Gtid != "" {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current' or empty; got: %v", vgtid)
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current' or empty; got: %+v",
vgtid)
}
// TODO(sougou): this should work with the new Migrate workflow
_, _, allShards, err := vsm.resolver.GetKeyspaceShards(ctx, sgtid.Keyspace, tabletType)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, vterrors.Wrapf(err, "failed to get shards in keyspace %s", sgtid.Keyspace)
}
for _, shard := range allShards {
newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{
Expand Down Expand Up @@ -329,23 +330,23 @@ func (vs *vstream) sendEvents(ctx context.Context) {
send := func(evs []*binlogdatapb.VEvent) error {
if err := vs.send(evs); err != nil {
vs.once.Do(func() {
vs.setError(err)
vs.setError(err, "error sending events")
})
return err
return vterrors.Wrap(err, "error sending events")
}
return nil
}
for {
select {
case <-ctx.Done():
vs.once.Do(func() {
vs.setError(fmt.Errorf("context canceled"))
vs.setError(ctx.Err(), "context ended while sending events")
})
return
case evs := <-vs.eventCh:
if err := send(evs); err != nil {
vs.once.Do(func() {
vs.setError(err)
vs.setError(err, "error sending events")
})
return
}
Expand All @@ -359,7 +360,7 @@ func (vs *vstream) sendEvents(ctx context.Context) {
}}
if err := send(evs); err != nil {
vs.once.Do(func() {
vs.setError(err)
vs.setError(err, "error sending heartbeat")
})
return
}
Expand All @@ -378,7 +379,7 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard
if err != nil {
log.Errorf("Error in vstream for %+v: %s", sgtid, err)
vs.once.Do(func() {
vs.setError(err)
vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid))
vs.cancel()
})
}
Expand Down Expand Up @@ -464,10 +465,12 @@ func (vs *vstream) alignStreams(ctx context.Context, event *binlogdatapb.VEvent,
}
select {
case <-ctx.Done():
return ctx.Err()
return vterrors.Wrapf(ctx.Err(), "context ended while waiting for skew to reduce for stream %s from %s/%s",
streamID, keyspace, shard)
case <-time.After(time.Duration(vs.skewTimeoutSeconds) * time.Second):
log.Errorf("timed out while waiting for skew to reduce: %s", streamID)
return fmt.Errorf("timed out while waiting for skew to reduce: %s", streamID)
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "timed out while waiting for skew to reduce for stream %s from %s/%s",
streamID, keyspace, shard)
case <-vs.skewCh:
// once skew is fixed the channel is closed and all waiting streams "wake up"
}
Expand Down Expand Up @@ -500,7 +503,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
for {
select {
case <-ctx.Done():
return ctx.Err()
return vterrors.Wrapf(ctx.Err(), "context ended while streaming from %s/%s", sgtid.Keyspace, sgtid.Shard)
case <-journalDone:
// Unreachable.
// This can happen if a server misbehaves and does not end
Expand Down Expand Up @@ -556,7 +559,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
tabletConn, err := vs.vsm.resolver.GetGateway().QueryServiceByAlias(ctx, tablet.Alias, target)
if err != nil {
log.Errorf(err.Error())
return err
return vterrors.Wrapf(err, "failed to get tablet connection to %s", topoproto.TabletAliasString(tablet.Alias))
}

errCh := make(chan error, 1)
Expand All @@ -565,7 +568,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var err error
switch {
case ctx.Err() != nil:
err = fmt.Errorf("context has ended")
err = vterrors.Wrapf(ctx.Err(), "context ended while streaming tablet health from %s", topoproto.TabletAliasString(tablet.Alias))
case shr == nil || shr.RealtimeStats == nil || shr.Target == nil:
err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias))
case vs.tabletType != shr.Target.TabletType:
Expand All @@ -580,6 +583,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
if err != nil {
log.Warningf("Tablet state changed: %s, attempting to restart", err)
err = vterrors.Wrapf(err, "error streaming tablet health from %s", topoproto.TabletAliasString(tablet.Alias))
errCh <- err
return err
}
Expand All @@ -604,7 +608,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
Options: options,
}
var vstreamCreatedOnce sync.Once
log.Infof("Starting to vstream from %s, with req %+v", topoproto.TabletAliasString(tablet.Alias), req)
tabletAliasString := topoproto.TabletAliasString(tablet.Alias)
log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req)
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0
Expand All @@ -617,9 +622,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha

select {
case <-ctx.Done():
return ctx.Err()
return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s",
tabletAliasString, sgtid.Keyspace, sgtid.Shard)
case streamErr := <-errCh:
return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error())
return vterrors.Wrapf(streamErr, "error streaming from tablet %s in %s/%s",
tabletAliasString, sgtid.Keyspace, sgtid.Shard)
case <-journalDone:
// Unreachable.
// This can happen if a server misbehaves and does not end
Expand All @@ -628,6 +635,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
default:
}

aligningStreamsErr := fmt.Sprintf("error aligning streams across %s/%s", sgtid.Keyspace, sgtid.Shard)
sendingEventsErr := fmt.Sprintf("error sending event batch to tablet %s", tabletAliasString)

sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
for i, event := range events {
switch event.Type {
Expand All @@ -648,11 +658,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
eventss = append(eventss, sendevents)

if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
return vterrors.Wrap(err, aligningStreamsErr)
}

if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
return vterrors.Wrap(err, sendingEventsErr)
}
eventss = nil
sendevents = nil
Expand All @@ -664,11 +674,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
eventss = append(eventss, sendevents)

if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
return vterrors.Wrap(err, aligningStreamsErr)
}

if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
return vterrors.Wrap(err, sendingEventsErr)
}
eventss = nil
sendevents = nil
Expand All @@ -677,7 +687,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Otherwise they can accumulate indefinitely if there are no real events.
// TODO(sougou): figure out a model for this.
if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
return vterrors.Wrap(err, aligningStreamsErr)
}
case binlogdatapb.VEventType_JOURNAL:
journal := event.Journal
Expand All @@ -698,14 +708,15 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
eventss = append(eventss, sendevents)
if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
return vterrors.Wrap(err, sendingEventsErr)
}
eventss = nil
sendevents = nil
}
je, err := vs.getJournalEvent(ctx, sgtid, journal)
if err != nil {
return err
return vterrors.Wrapf(err, "error getting journal event for shard GTID %+v on tablet %s",
sgtid, tabletAliasString)
}
if je != nil {
var endTimer *time.Timer
Expand All @@ -725,7 +736,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
journalDone = je.done
select {
case <-ctx.Done():
return ctx.Err()
return vterrors.Wrapf(ctx.Err(), "context ended while waiting for journal event for shard GTID %+v on tablet %s",
sgtid, tabletAliasString)
case <-journalDone:
if endTimer != nil {
<-endTimer.C
Expand All @@ -752,13 +764,14 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
if err == nil {
// Unreachable.
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly")
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly on tablet %s", tabletAliasString)
}

retry, ignoreTablet := vs.shouldRetry(err)
if !retry {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
return vterrors.Wrapf(err, "error in vstream for %s/%s on tablet %s",
sgtid.Keyspace, sgtid.Shard, tabletAliasString)
}
if ignoreTablet {
ignoreTablets = append(ignoreTablets, tablet.GetAlias())
Expand All @@ -768,7 +781,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Retry, at most, 3 times if the error can be retried.
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
return vterrors.Wrapf(err, "persistent error in vstream for %s/%s on tablet %s; giving up",
sgtid.Keyspace, sgtid.Shard, tabletAliasString)
}
log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err)
}
Expand Down Expand Up @@ -884,10 +898,10 @@ func (vs *vstream) getError() error {
return vs.err
}

func (vs *vstream) setError(err error) {
func (vs *vstream) setError(err error, msg string) {
vs.errMu.Lock()
defer vs.errMu.Unlock()
vs.err = err
vs.err = vterrors.Wrap(err, msg)
}

// getJournalEvent returns a journalEvent. The caller has to wait on its done channel.
Expand Down

0 comments on commit 50513d2

Please sign in to comment.