Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/rawdb: fix datarace in freezer #22728

Merged
merged 5 commits into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 58 additions & 34 deletions core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,35 +465,59 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
// Note, this method will *not* flush any data to disk so be sure to explicitly
// fsync before irreversibly deleting data from the database.
func (t *freezerTable) Append(item uint64, blob []byte) error {
// Encode the blob before the lock portion
if !t.noCompression {
blob = snappy.Encode(nil, blob)
}
// Read lock prevents competition with truncate
t.lock.RLock()
retry, err := t.append(item, blob, false)
if err != nil {
return err
}
if retry {
// Read lock was insufficient, retry with a writelock
_, err = t.append(item, blob, true)
}
return err
}

// append injects a binary blob at the end of the freezer table.
// Normally, inserts do not require holding the write-lock, so it should be invoked with 'wlock' set to
// false.
// However, if the data will grown the current file out of bounds, then this
// method will return 'true, nil', indicating that the caller should retry, this time
// with 'wlock' set to true.
func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool, error) {
if wlock {
t.lock.Lock()
defer t.lock.Unlock()
} else {
t.lock.RLock()
defer t.lock.RUnlock()
}
// Ensure the table is still accessible
if t.index == nil || t.head == nil {
t.lock.RUnlock()
return errClosed
return false, errClosed
}
// Ensure only the next item can be written, nothing else
if atomic.LoadUint64(&t.items) != item {
t.lock.RUnlock()
return fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item)
}
// Encode the blob and write it into the data file
if !t.noCompression {
blob = snappy.Encode(nil, blob)
return false, fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item)
}
bLen := uint32(len(blob))
bLen := uint32(len(encodedBlob))
if t.headBytes+bLen < bLen ||
t.headBytes+bLen > t.maxFileSize {
// we need a new file, writing would overflow
t.lock.RUnlock()
t.lock.Lock()
// Writing would overflow, so we need to open a new data file.
// If we don't already hold the writelock, abort and let the caller
// invoke this method a second time.
if !wlock {
return true, nil
}
nextID := atomic.LoadUint32(&t.headId) + 1
// We open the next file in truncated mode -- if this file already
// exists, we need to start over from scratch on it
newHead, err := t.openFile(nextID, openFreezerFileTruncated)
if err != nil {
t.lock.Unlock()
return err
return false, err
}
// Close old file, and reopen in RDONLY mode
t.releaseFile(t.headId)
Expand All @@ -503,13 +527,9 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
t.head = newHead
atomic.StoreUint32(&t.headBytes, 0)
atomic.StoreUint32(&t.headId, nextID)
t.lock.Unlock()
t.lock.RLock()
}

defer t.lock.RUnlock()
if _, err := t.head.Write(blob); err != nil {
return err
if _, err := t.head.Write(encodedBlob); err != nil {
return false, err
}
newOffset := atomic.AddUint32(&t.headBytes, bLen)
idx := indexEntry{
Expand All @@ -523,7 +543,7 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
t.sizeGauge.Inc(int64(bLen + indexEntrySize))

atomic.AddUint64(&t.items, 1)
return nil
return false, nil
}

// getBounds returns the indexes for the item
Expand Down Expand Up @@ -562,44 +582,48 @@ func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) {
// Retrieve looks up the data offset of an item with the given number and retrieves
// the raw binary blob from the data file.
func (t *freezerTable) Retrieve(item uint64) ([]byte, error) {
blob, err := t.retrieve(item)
if err != nil {
return nil, err
}
if t.noCompression {
return blob, nil
}
return snappy.Decode(nil, blob)
}

// retrieve looks up the data offset of an item with the given number and retrieves
// the raw binary blob from the data file. OBS! This method does not decode
// compressed data.
func (t *freezerTable) retrieve(item uint64) ([]byte, error) {
t.lock.RLock()
defer t.lock.RUnlock()
// Ensure the table and the item is accessible
if t.index == nil || t.head == nil {
t.lock.RUnlock()
return nil, errClosed
}
if atomic.LoadUint64(&t.items) <= item {
t.lock.RUnlock()
return nil, errOutOfBounds
}
// Ensure the item was not deleted from the tail either
if uint64(t.itemOffset) > item {
t.lock.RUnlock()
return nil, errOutOfBounds
}
startOffset, endOffset, filenum, err := t.getBounds(item - uint64(t.itemOffset))
if err != nil {
t.lock.RUnlock()
return nil, err
}
dataFile, exist := t.files[filenum]
if !exist {
t.lock.RUnlock()
return nil, fmt.Errorf("missing data file %d", filenum)
}
// Retrieve the data itself, decompress and return
blob := make([]byte, endOffset-startOffset)
if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil {
t.lock.RUnlock()
return nil, err
}
t.lock.RUnlock()
t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize))

if t.noCompression {
return blob, nil
}
return snappy.Decode(nil, blob)
return blob, nil
}

// has returns an indicator whether the specified number data
Expand Down
58 changes: 55 additions & 3 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package rawdb

import (
"bytes"
"encoding/binary"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -637,6 +640,55 @@ func TestOffset(t *testing.T) {
// 1. have data files d0, d1, d2, d3
// 2. remove d2,d3
//
// However, all 'normal' failure modes arising due to failing to sync() or save a file should be
// handled already, and the case described above can only (?) happen if an external process/user
// deletes files from the filesystem.
// However, all 'normal' failure modes arising due to failing to sync() or save a file
// should be handled already, and the case described above can only (?) happen if an
// external process/user deletes files from the filesystem.

// TestAppendTruncateParallel is a test to check if the Append/truncate operations are
// racy.
//
// The reason why it's not a regular fuzzer, within tests/fuzzers, is that it is dependent
// on timing rather than 'clever' input -- there's no determinism.
func TestAppendTruncateParallel(t *testing.T) {
dir, err := ioutil.TempDir("", "freezer")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)

f, err := newCustomTable(dir, "tmp", metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, 8, true)
if err != nil {
t.Fatal(err)
}

fill := func(mark uint64) []byte {
data := make([]byte, 8)
binary.LittleEndian.PutUint64(data, mark)
return data
}

for i := 0; i < 5000; i++ {
f.truncate(0)
data0 := fill(0)
f.Append(0, data0)
data1 := fill(1)

var wg sync.WaitGroup
wg.Add(2)
go func() {
f.truncate(0)
wg.Done()
}()
go func() {
f.Append(1, data1)
wg.Done()
}()
wg.Wait()

if have, err := f.Retrieve(0); err == nil {
if !bytes.Equal(have, data0) {
t.Fatalf("have %x want %x", have, data0)
}
}
}
}