-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
local backend: fix worker err overriden by job generation err #48185
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1201,6 +1201,12 @@ func (local *Backend) generateAndSendJob( | |
} | ||
|
||
failpoint.Inject("beforeGenerateJob", nil) | ||
failpoint.Inject("sendDummyJob", func(_ failpoint.Value) { | ||
// this is used to trigger worker failure, used together | ||
// with WriteToTiKVNotEnoughDiskSpace | ||
jobToWorkerCh <- ®ionJob{} | ||
time.Sleep(5 * time.Second) | ||
}) | ||
jobs, err := local.generateJobForRange(egCtx, p.Data, p.Range, regionSplitSize, regionSplitKeys) | ||
if err != nil { | ||
if common.IsContextCanceledError(err) { | ||
|
@@ -1679,29 +1685,30 @@ func (local *Backend) doImport(ctx context.Context, engine common.Engine, region | |
|
||
failpoint.Label("afterStartWorker") | ||
|
||
err := local.prepareAndSendJob( | ||
workerCtx, | ||
engine, | ||
regionRanges, | ||
regionSplitSize, | ||
regionSplitKeys, | ||
jobToWorkerCh, | ||
&jobWg, | ||
) | ||
if err != nil { | ||
firstErr.Set(err) | ||
workGroup.Go(func() error { | ||
err := local.prepareAndSendJob( | ||
workerCtx, | ||
engine, | ||
regionRanges, | ||
regionSplitSize, | ||
regionSplitKeys, | ||
jobToWorkerCh, | ||
&jobWg, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
jobWg.Wait() | ||
workerCancel() | ||
err2 := workGroup.Wait() | ||
if !common.IsContextCanceledError(err2) { | ||
log.FromContext(ctx).Error("worker meets error", zap.Error(err2)) | ||
return nil | ||
}) | ||
if err := workGroup.Wait(); err != nil { | ||
if !common.IsContextCanceledError(err) { | ||
log.FromContext(ctx).Error("do import meets error", zap.Error(err)) | ||
} | ||
return firstErr.Get() | ||
firstErr.Set(err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems we don't need firstErr, because error group can maintain first error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is a routine above managing job wait and retry, we need it for now, below too |
||
} | ||
|
||
jobWg.Wait() | ||
workerCancel() | ||
firstErr.Set(workGroup.Wait()) | ||
firstErr.Set(ctx.Err()) | ||
return firstErr.Get() | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -252,6 +252,13 @@ func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pd | |
func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
|
||
select { | ||
case <-ctx.Done(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if err := ctx.Err(); err != nil { ... } |
||
return nil, ctx.Err() | ||
default: | ||
} | ||
|
||
if c.hook != nil { | ||
key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also we don't need workerCancel? error group will automatically cancel the derived context when wg.Go returns errorI just remember workerCancel is used to notify workers that all job is finished when no error happens
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also used to cancel worker when retry count of some job exhausted