Skip to content

Commit

Permalink
util.chunk: Add Mutex for flushing to avoid race condition (#14970)
Browse files Browse the repository at this point in the history
  • Loading branch information
ichn-hu authored Mar 3, 2020
1 parent efe3b8f commit e1597a6
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type ListInDisk struct {

disk *os.File
bufWriter *bufio.Writer
bufFlushMutex sync.RWMutex
diskTracker *disk.Tracker // track disk usage.
numRowsInDisk int
}
Expand All @@ -96,6 +97,7 @@ func (l *ListInDisk) initDiskFile() (err error) {
}
l.bufWriter = bufWriterPool.Get().(*bufio.Writer)
l.bufWriter.Reset(l.disk)
l.bufFlushMutex = sync.RWMutex{}
return
}

Expand All @@ -110,15 +112,26 @@ func (l *ListInDisk) GetDiskTracker() *disk.Tracker {
}

// flush empties the write buffer, please call flush before read!
func (l *ListInDisk) flush() error {
func (l *ListInDisk) flush() (err error) {
// buffered is not zero only after Add and before GetRow, after the first flush, buffered will always be zero,
// hence we use a RWLock to allow quicker quit.
l.bufFlushMutex.RLock()
buffered := l.bufWriter.Buffered()
l.bufFlushMutex.RUnlock()
if buffered == 0 {
return nil
}
l.bufFlushMutex.Lock()
if l.bufWriter.Buffered() != 0 {
return l.bufWriter.Flush()
err = l.bufWriter.Flush()
}
return nil
l.bufFlushMutex.Unlock()
return err
}

// Add adds a chunk to the ListInDisk. Caller must make sure the input chk
// is not empty and not used any more and has the same field types.
// Warning: do not mix Add and GetRow (always use GetRow after you have added all the chunks), and do not use Add concurrently.
func (l *ListInDisk) Add(chk *Chunk) (err error) {
if chk.NumRows() == 0 {
return errors.New("chunk appended to List should have at least 1 row")
Expand Down

0 comments on commit e1597a6

Please sign in to comment.