From 8a5d7d9d56b5713ba36eb3666c15c47420eaee9b Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 13 Oct 2023 18:30:39 +0800 Subject: [PATCH] executor: Fix crash during sort spill (#47581) close pingcap/tidb#47538 --- executor/sort.go | 5 +- util/chunk/row_container.go | 147 +++++++++++++++++-------------- util/chunk/row_container_test.go | 36 ++++++++ 3 files changed, 120 insertions(+), 68 deletions(-) diff --git a/executor/sort.go b/executor/sort.go index 06241993e05f3..cb2c97e68a8e4 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -234,7 +234,10 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error { } }) if e.rowChunks.NumRow() > 0 { - e.rowChunks.Sort() + err := e.rowChunks.Sort() + if err != nil { + return err + } e.partitionList = append(e.partitionList, e.rowChunks) } return nil diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index bf095dbed18db..ca6958c3ab7fa 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -78,6 +78,11 @@ func (m *mutexForRowContainer) RUnlock() { m.rLock.RUnlock() } +type spillHelper interface { + SpillToDisk() + hasEnoughDataToSpill(t *memory.Tracker) bool +} + // RowContainer provides a place for many rows, so many that we might want to spill them into disk. // nolint:structcheck type RowContainer struct { @@ -121,6 +126,14 @@ func (c *RowContainer) ShallowCopyWithNewMutex() *RowContainer { // SpillToDisk spills data to disk. This function may be called in parallel. func (c *RowContainer) SpillToDisk() { + c.spillToDisk(nil) +} + +func (*RowContainer) hasEnoughDataToSpill(_ *memory.Tracker) bool { + return true +} + +func (c *RowContainer) spillToDisk(preSpillError error) { c.m.Lock() defer c.m.Unlock() if c.alreadySpilled() { @@ -153,6 +166,10 @@ func (c *RowContainer) SpillToDisk() { panic("out of disk quota when spilling") } }) + if preSpillError != nil { + c.m.records.spillError = preSpillError + return + } for i := 0; i < n; i++ { chk := c.m.records.inMemory.GetChunk(i) err = c.m.records.inDisk.Add(chk) @@ -313,8 +330,9 @@ func (c *RowContainer) Close() (err error) { func (c *RowContainer) ActionSpill() *SpillDiskAction { if c.actionSpill == nil { c.actionSpill = &SpillDiskAction{ - c: c, - cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}} + c: c, + baseSpillDiskAction: &baseSpillDiskAction{cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}}, + } } return c.actionSpill } @@ -323,23 +341,21 @@ func (c *RowContainer) ActionSpill() *SpillDiskAction { func (c *RowContainer) ActionSpillForTest() *SpillDiskAction { c.actionSpill = &SpillDiskAction{ c: c, - testSyncInputFunc: func() { - c.actionSpill.testWg.Add(1) + baseSpillDiskAction: &baseSpillDiskAction{ + testSyncInputFunc: func() { + c.actionSpill.testWg.Add(1) + }, + testSyncOutputFunc: func() { + c.actionSpill.testWg.Done() + }, + cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}, }, - testSyncOutputFunc: func() { - c.actionSpill.testWg.Done() - }, - cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}, } return c.actionSpill } -// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If -// the memory quota of a query is exceeded, SpillDiskAction.Action is -// triggered. -type SpillDiskAction struct { +type baseSpillDiskAction struct { memory.BaseOOMAction - c *RowContainer m sync.Mutex once sync.Once cond spillStatusCond @@ -350,6 +366,20 @@ type SpillDiskAction struct { testWg sync.WaitGroup } +// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If +// the memory quota of a query is exceeded, SpillDiskAction.Action is +// triggered. +type SpillDiskAction struct { + c *RowContainer + *baseSpillDiskAction +} + +// Action sends a signal to trigger spillToDisk method of RowContainer +// and if it is already triggered before, call its fallbackAction. +func (a *SpillDiskAction) Action(t *memory.Tracker) { + a.action(t, a.c) +} + type spillStatusCond struct { *sync.Cond // status indicates different stages for the Action @@ -367,38 +397,35 @@ const ( spilledYet ) -func (a *SpillDiskAction) setStatus(status spillStatus) { +func (a *baseSpillDiskAction) setStatus(status spillStatus) { a.cond.L.Lock() defer a.cond.L.Unlock() a.cond.status = status } -func (a *SpillDiskAction) getStatus() spillStatus { +func (a *baseSpillDiskAction) getStatus() spillStatus { a.cond.L.Lock() defer a.cond.L.Unlock() return a.cond.status } -// Action sends a signal to trigger spillToDisk method of RowContainer -// and if it is already triggered before, call its fallbackAction. -func (a *SpillDiskAction) Action(t *memory.Tracker) { +func (a *baseSpillDiskAction) action(t *memory.Tracker, spillHelper spillHelper) { a.m.Lock() defer a.m.Unlock() - if a.getStatus() == notSpilled { + if a.getStatus() == notSpilled && spillHelper.hasEnoughDataToSpill(t) { a.once.Do(func() { logutil.BgLogger().Info("memory exceeds quota, spill to disk now.", zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit())) if a.testSyncInputFunc != nil { a.testSyncInputFunc() - c := a.c go func() { - c.SpillToDisk() + spillHelper.SpillToDisk() a.testSyncOutputFunc() }() return } - go a.c.SpillToDisk() + go spillHelper.SpillToDisk() }) return } @@ -418,7 +445,7 @@ func (a *SpillDiskAction) Action(t *memory.Tracker) { } // Reset resets the status for SpillDiskAction. -func (a *SpillDiskAction) Reset() { +func (a *baseSpillDiskAction) Reset() { a.m.Lock() defer a.m.Unlock() a.setStatus(notSpilled) @@ -426,12 +453,12 @@ func (a *SpillDiskAction) Reset() { } // GetPriority get the priority of the Action. -func (*SpillDiskAction) GetPriority() int64 { +func (*baseSpillDiskAction) GetPriority() int64 { return memory.DefSpillPriority } // WaitForTest waits all goroutine have gone. -func (a *SpillDiskAction) WaitForTest() { +func (a *baseSpillDiskAction) WaitForTest() { a.testWg.Wait() } @@ -522,9 +549,15 @@ func (c *SortedRowContainer) keyColumnsLess(i, j int) bool { } // Sort inits pointers and sorts the records. -func (c *SortedRowContainer) Sort() { +func (c *SortedRowContainer) Sort() (ret error) { c.ptrM.Lock() defer c.ptrM.Unlock() + ret = nil + defer func() { + if r := recover(); r != nil { + ret = fmt.Errorf("%v", r) + } + }() if c.ptrM.rowPtrs != nil { return } @@ -539,12 +572,24 @@ func (c *SortedRowContainer) Sort() { c.ptrM.rowPtrs = append(c.ptrM.rowPtrs, RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) } } + failpoint.Inject("errorDuringSortRowContainer", func(val failpoint.Value) { + if val.(bool) { + panic("sort meet error") + } + }) sort.Slice(c.ptrM.rowPtrs, c.keyColumnsLess) + return } -func (c *SortedRowContainer) sortAndSpillToDisk() { - c.Sort() - c.RowContainer.SpillToDisk() +// SpillToDisk spills data to disk. This function may be called in parallel. +func (c *SortedRowContainer) SpillToDisk() { + err := c.Sort() + c.RowContainer.spillToDisk(err) +} + +func (c *SortedRowContainer) hasEnoughDataToSpill(t *memory.Tracker) bool { + // Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files. + return c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10 } // Add appends a chunk into the SortedRowContainer. @@ -571,8 +616,8 @@ func (c *SortedRowContainer) GetSortedRow(idx int) (Row, error) { func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction { if c.actionSpill == nil { c.actionSpill = &SortAndSpillDiskAction{ - c: c, - SpillDiskAction: c.RowContainer.ActionSpill(), + c: c, + baseSpillDiskAction: c.RowContainer.ActionSpill().baseSpillDiskAction, } } return c.actionSpill @@ -581,8 +626,8 @@ func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction { // ActionSpillForTest returns a SortAndSpillDiskAction for sorting and spilling over to disk for test. func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction { c.actionSpill = &SortAndSpillDiskAction{ - c: c, - SpillDiskAction: c.RowContainer.ActionSpillForTest(), + c: c, + baseSpillDiskAction: c.RowContainer.ActionSpillForTest().baseSpillDiskAction, } return c.actionSpill } @@ -597,45 +642,13 @@ func (c *SortedRowContainer) GetMemTracker() *memory.Tracker { // triggered. type SortAndSpillDiskAction struct { c *SortedRowContainer - *SpillDiskAction + *baseSpillDiskAction } // Action sends a signal to trigger sortAndSpillToDisk method of RowContainer // and if it is already triggered before, call its fallbackAction. func (a *SortAndSpillDiskAction) Action(t *memory.Tracker) { - a.m.Lock() - defer a.m.Unlock() - // Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files. - if a.getStatus() == notSpilled && a.c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10 { - a.once.Do(func() { - logutil.BgLogger().Info("memory exceeds quota, spill to disk now.", - zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit())) - if a.testSyncInputFunc != nil { - a.testSyncInputFunc() - c := a.c - go func() { - c.sortAndSpillToDisk() - a.testSyncOutputFunc() - }() - return - } - go a.c.sortAndSpillToDisk() - }) - return - } - - a.cond.L.Lock() - for a.cond.status == spilling { - a.cond.Wait() - } - a.cond.L.Unlock() - - if !t.CheckExceed() { - return - } - if fallback := a.GetFallback(); fallback != nil { - fallback.Action(t) - } + a.action(t, a.c) } // WaitForTest waits all goroutine have gone. diff --git a/util/chunk/row_container_test.go b/util/chunk/row_container_test.go index 638a42090bace..381f174dfbb6c 100644 --- a/util/chunk/row_container_test.go +++ b/util/chunk/row_container_test.go @@ -470,6 +470,42 @@ func TestPanicWhenSpillToDisk(t *testing.T) { require.EqualError(t, rc.Add(chk), "out of disk quota when spilling") } +func TestPanicDuringSortedRowContainerSpill(t *testing.T) { + fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + byItemsDesc := []bool{false} + keyColumns := []int{0} + keyCmpFuncs := []CompareFunc{cmpInt64} + sz := 20 + rc := NewSortedRowContainer(fields, sz, byItemsDesc, keyColumns, keyCmpFuncs) + + chk := NewChunkWithCapacity(fields, sz) + for i := 0; i < sz; i++ { + chk.AppendInt64(0, int64(i)) + } + var tracker *memory.Tracker + var err error + tracker = rc.GetMemTracker() + tracker.SetBytesLimit(chk.MemoryUsage() + int64(8*chk.NumRows()) + 1) + tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest()) + require.False(t, rc.AlreadySpilledSafeForTest()) + err = rc.Add(chk) + require.NoError(t, err) + rc.actionSpill.WaitForTest() + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer")) + }() + err = rc.Add(chk) + require.NoError(t, err) + rc.actionSpill.WaitForTest() + require.True(t, rc.AlreadySpilledSafeForTest()) + + _, err = rc.GetRow(RowPtr{}) + require.EqualError(t, err, "sort meet error") +} + func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) { benchmarkRowContainerReaderInDiskWithRowLength(b, 512) }