From e1597a684cf7ed9a4a6976aff0e9f19746ed1c42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E?= Date: Tue, 3 Mar 2020 14:56:39 +0800 Subject: [PATCH] util.chunk: Add Mutex for flushing to avoid race condition (#14970) --- util/chunk/disk.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/util/chunk/disk.go b/util/chunk/disk.go index 96dafed60848e..d0874d7774128 100644 --- a/util/chunk/disk.go +++ b/util/chunk/disk.go @@ -72,6 +72,7 @@ type ListInDisk struct { disk *os.File bufWriter *bufio.Writer + bufFlushMutex sync.RWMutex diskTracker *disk.Tracker // track disk usage. numRowsInDisk int } @@ -96,6 +97,7 @@ func (l *ListInDisk) initDiskFile() (err error) { } l.bufWriter = bufWriterPool.Get().(*bufio.Writer) l.bufWriter.Reset(l.disk) + l.bufFlushMutex = sync.RWMutex{} return } @@ -110,15 +112,26 @@ func (l *ListInDisk) GetDiskTracker() *disk.Tracker { } // flush empties the write buffer, please call flush before read! -func (l *ListInDisk) flush() error { +func (l *ListInDisk) flush() (err error) { + // buffered is not zero only after Add and before GetRow, after the first flush, buffered will always be zero, + // hence we use a RWLock to allow quicker quit. + l.bufFlushMutex.RLock() + buffered := l.bufWriter.Buffered() + l.bufFlushMutex.RUnlock() + if buffered == 0 { + return nil + } + l.bufFlushMutex.Lock() if l.bufWriter.Buffered() != 0 { - return l.bufWriter.Flush() + err = l.bufWriter.Flush() } - return nil + l.bufFlushMutex.Unlock() + return err } // Add adds a chunk to the ListInDisk. Caller must make sure the input chk // is not empty and not used any more and has the same field types. +// Warning: do not mix Add and GetRow (always use GetRow after you have added all the chunks), and do not use Add concurrently. func (l *ListInDisk) Add(chk *Chunk) (err error) { if chk.NumRows() == 0 { return errors.New("chunk appended to List should have at least 1 row")