From c1867cbfeef5add147400b50a7ba1a08492cd2e7 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 25 Jul 2023 14:38:55 +0800 Subject: [PATCH 1/3] try --- executor/executor.go | 4 ++ util/chunk/row_container.go | 13 +++++++ util/chunk/row_container_test.go | 67 ++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+) diff --git a/executor/executor.go b/executor/executor.go index 11ab5431b96cc..ae9063b1f5492 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -2076,6 +2076,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { vars.MemTracker.UnbindActions() vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery) vars.MemTracker.ResetMaxConsumed() + vars.DiskTracker.Detach() vars.DiskTracker.ResetMaxConsumed() vars.MemTracker.SessionID.Store(vars.ConnectionID) vars.StmtCtx.TableStats = make(map[int64]interface{}) @@ -2116,6 +2117,9 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { globalConfig := config.GetGlobalConfig() if variable.EnableTmpStorageOnOOM.Load() && sc.DiskTracker != nil { sc.DiskTracker.AttachTo(vars.DiskTracker) + if GlobalDiskUsageTracker != nil { + vars.DiskTracker.AttachTo(GlobalDiskUsageTracker) + } } if execStmt, ok := s.(*ast.ExecuteStmt); ok { prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars) diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index 7ae1a67879b03..b0518638d1f19 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -16,6 +16,7 @@ package chunk import ( "errors" + "fmt" "sort" "sync" "time" @@ -140,6 +141,18 @@ func (c *RowContainer) SpillToDisk() { n := c.m.records.inMemory.NumChunks() c.m.records.inDisk = NewListInDisk(c.m.records.inMemory.FieldTypes()) c.m.records.inDisk.diskTracker.AttachTo(c.diskTracker) + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("%v", r) + c.m.records.spillError = err + logutil.BgLogger().Error("SpillToDisk panicked", zap.Stack("stack"), zap.Error(err)) + } + }() + failpoint.Inject("spillToDiskOutOfDiskQuota", func(val failpoint.Value) { + if val.(bool) { + panic("out of disk quota when spilling") + } + }) for i := 0; i < n; i++ { chk := c.m.records.inMemory.GetChunk(i) err = c.m.records.inDisk.Add(chk) diff --git a/util/chunk/row_container_test.go b/util/chunk/row_container_test.go index 2160496813cfd..825398b656e53 100644 --- a/util/chunk/row_container_test.go +++ b/util/chunk/row_container_test.go @@ -491,3 +491,70 @@ func benchmarkRowContainerReaderInDiskWithRowLength(b *testing.B, rowLength int) } require.NoError(b, reader.Error()) } + +func TestPanicWhenSort(t *testing.T) { + fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + sz := 20 + chk := NewChunkWithCapacity(fields, sz) + for i := 0; i < sz; i++ { + chk.AppendInt64(0, int64(i)) + } + + byItemsDesc := []bool{false} + keyColumns := []int{0} + keyCmpFuncs := []CompareFunc{cmpInt64} + rc := NewSortedRowContainer(fields, sz, byItemsDesc, keyColumns, keyCmpFuncs) + + tracker := rc.GetMemTracker() + tracker.SetBytesLimit(chk.MemoryUsage() + 1) + tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest()) + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/sortOOM", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/sortOOM")) + }() + // sortAndSpillToDisk should be called + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + _, err := rc.GetSortedRow(0) + require.EqualError(t, err, "out of memory quota when sorting") + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.EqualError(t, rc.Add(chk), "out of memory quota when sorting") +} + +func TestPanicWhenSpillToDisk(t *testing.T) { + fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + sz := 20 + chk := NewChunkWithCapacity(fields, sz) + for i := 0; i < sz; i++ { + chk.AppendInt64(0, int64(i)) + } + + rc := NewRowContainer(fields, sz) + tracker := rc.GetMemTracker() + tracker.SetBytesLimit(chk.MemoryUsage() + 1) + tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest()) + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota")) + }() + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + require.True(t, rc.AlreadySpilledSafeForTest()) + + _, err := rc.GetRow(RowPtr{}) + require.EqualError(t, err, "out of disk quota when spilling") + require.EqualError(t, rc.Add(chk), "out of disk quota when spilling") +} From e9714ca85af71898267b8651c954dc84dea4ee6a Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 25 Jul 2023 14:44:56 +0800 Subject: [PATCH 2/3] add ut --- executor/executor.go | 2 +- util/chunk/row_container_test.go | 98 ++++++++++---------------------- 2 files changed, 32 insertions(+), 68 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index ae9063b1f5492..b91e3ba16a478 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -139,7 +139,7 @@ type baseExecutor struct { const ( // globalPanicStorageExceed represents the panic message when out of storage quota. - globalPanicStorageExceed string = "Out Of Global Storage Quota!" + globalPanicStorageExceed string = "Out Of Quota For Local Temporary Space!" // globalPanicMemoryExceed represents the panic message when out of memory limit. globalPanicMemoryExceed string = "Out Of Global Memory Limit!" // globalPanicAnalyzeMemoryExceed represents the panic message when out of analyze memory limit. diff --git a/util/chunk/row_container_test.go b/util/chunk/row_container_test.go index 825398b656e53..638a42090bace 100644 --- a/util/chunk/row_container_test.go +++ b/util/chunk/row_container_test.go @@ -439,6 +439,37 @@ func TestReadAfterSpillWithRowContainerReader(t *testing.T) { } } +func TestPanicWhenSpillToDisk(t *testing.T) { + fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + sz := 20 + chk := NewChunkWithCapacity(fields, sz) + for i := 0; i < sz; i++ { + chk.AppendInt64(0, int64(i)) + } + + rc := NewRowContainer(fields, sz) + tracker := rc.GetMemTracker() + tracker.SetBytesLimit(chk.MemoryUsage() + 1) + tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest()) + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota")) + }() + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + require.True(t, rc.AlreadySpilledSafeForTest()) + + _, err := rc.GetRow(RowPtr{}) + require.EqualError(t, err, "out of disk quota when spilling") + require.EqualError(t, rc.Add(chk), "out of disk quota when spilling") +} + func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) { benchmarkRowContainerReaderInDiskWithRowLength(b, 512) } @@ -491,70 +522,3 @@ func benchmarkRowContainerReaderInDiskWithRowLength(b *testing.B, rowLength int) } require.NoError(b, reader.Error()) } - -func TestPanicWhenSort(t *testing.T) { - fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} - sz := 20 - chk := NewChunkWithCapacity(fields, sz) - for i := 0; i < sz; i++ { - chk.AppendInt64(0, int64(i)) - } - - byItemsDesc := []bool{false} - keyColumns := []int{0} - keyCmpFuncs := []CompareFunc{cmpInt64} - rc := NewSortedRowContainer(fields, sz, byItemsDesc, keyColumns, keyCmpFuncs) - - tracker := rc.GetMemTracker() - tracker.SetBytesLimit(chk.MemoryUsage() + 1) - tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest()) - require.False(t, rc.AlreadySpilledSafeForTest()) - - require.NoError(t, rc.Add(chk)) - rc.actionSpill.WaitForTest() - require.False(t, rc.AlreadySpilledSafeForTest()) - - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/sortOOM", "return(true)")) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/sortOOM")) - }() - // sortAndSpillToDisk should be called - require.NoError(t, rc.Add(chk)) - rc.actionSpill.WaitForTest() - _, err := rc.GetSortedRow(0) - require.EqualError(t, err, "out of memory quota when sorting") - require.False(t, rc.AlreadySpilledSafeForTest()) - - require.EqualError(t, rc.Add(chk), "out of memory quota when sorting") -} - -func TestPanicWhenSpillToDisk(t *testing.T) { - fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} - sz := 20 - chk := NewChunkWithCapacity(fields, sz) - for i := 0; i < sz; i++ { - chk.AppendInt64(0, int64(i)) - } - - rc := NewRowContainer(fields, sz) - tracker := rc.GetMemTracker() - tracker.SetBytesLimit(chk.MemoryUsage() + 1) - tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest()) - require.False(t, rc.AlreadySpilledSafeForTest()) - - require.NoError(t, rc.Add(chk)) - rc.actionSpill.WaitForTest() - require.False(t, rc.AlreadySpilledSafeForTest()) - - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)")) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota")) - }() - require.NoError(t, rc.Add(chk)) - rc.actionSpill.WaitForTest() - require.True(t, rc.AlreadySpilledSafeForTest()) - - _, err := rc.GetRow(RowPtr{}) - require.EqualError(t, err, "out of disk quota when spilling") - require.EqualError(t, rc.Add(chk), "out of disk quota when spilling") -} From be0324b23dfbb9b965bf2f83c99c808cf11e8d18 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 1 Aug 2023 16:54:13 +0800 Subject: [PATCH 3/3] s/panic/failed --- util/chunk/row_container.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index b0518638d1f19..bf095dbed18db 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -145,7 +145,7 @@ func (c *RowContainer) SpillToDisk() { if r := recover(); r != nil { err := fmt.Errorf("%v", r) c.m.records.spillError = err - logutil.BgLogger().Error("SpillToDisk panicked", zap.Stack("stack"), zap.Error(err)) + logutil.BgLogger().Error("spill to disk failed", zap.Stack("stack"), zap.Error(err)) } }() failpoint.Inject("spillToDiskOutOfDiskQuota", func(val failpoint.Value) {