diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index b614c10d3726..d7bfe18e020f 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -465,35 +465,59 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { // Note, this method will *not* flush any data to disk so be sure to explicitly // fsync before irreversibly deleting data from the database. func (t *freezerTable) Append(item uint64, blob []byte) error { + // Encode the blob before the lock portion + if !t.noCompression { + blob = snappy.Encode(nil, blob) + } // Read lock prevents competition with truncate - t.lock.RLock() + retry, err := t.append(item, blob, false) + if err != nil { + return err + } + if retry { + // Read lock was insufficient, retry with a writelock + _, err = t.append(item, blob, true) + } + return err +} + +// append injects a binary blob at the end of the freezer table. +// Normally, inserts do not require holding the write-lock, so it should be invoked with 'wlock' set to +// false. +// However, if the data will grown the current file out of bounds, then this +// method will return 'true, nil', indicating that the caller should retry, this time +// with 'wlock' set to true. +func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool, error) { + if wlock { + t.lock.Lock() + defer t.lock.Unlock() + } else { + t.lock.RLock() + defer t.lock.RUnlock() + } // Ensure the table is still accessible if t.index == nil || t.head == nil { - t.lock.RUnlock() - return errClosed + return false, errClosed } // Ensure only the next item can be written, nothing else if atomic.LoadUint64(&t.items) != item { - t.lock.RUnlock() - return fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item) - } - // Encode the blob and write it into the data file - if !t.noCompression { - blob = snappy.Encode(nil, blob) + return false, fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item) } - bLen := uint32(len(blob)) + bLen := uint32(len(encodedBlob)) if t.headBytes+bLen < bLen || t.headBytes+bLen > t.maxFileSize { - // we need a new file, writing would overflow - t.lock.RUnlock() - t.lock.Lock() + // Writing would overflow, so we need to open a new data file. + // If we don't already hold the writelock, abort and let the caller + // invoke this method a second time. + if !wlock { + return true, nil + } nextID := atomic.LoadUint32(&t.headId) + 1 // We open the next file in truncated mode -- if this file already // exists, we need to start over from scratch on it newHead, err := t.openFile(nextID, openFreezerFileTruncated) if err != nil { - t.lock.Unlock() - return err + return false, err } // Close old file, and reopen in RDONLY mode t.releaseFile(t.headId) @@ -503,13 +527,9 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { t.head = newHead atomic.StoreUint32(&t.headBytes, 0) atomic.StoreUint32(&t.headId, nextID) - t.lock.Unlock() - t.lock.RLock() } - - defer t.lock.RUnlock() - if _, err := t.head.Write(blob); err != nil { - return err + if _, err := t.head.Write(encodedBlob); err != nil { + return false, err } newOffset := atomic.AddUint32(&t.headBytes, bLen) idx := indexEntry{ @@ -523,7 +543,7 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { t.sizeGauge.Inc(int64(bLen + indexEntrySize)) atomic.AddUint64(&t.items, 1) - return nil + return false, nil } // getBounds returns the indexes for the item @@ -562,44 +582,48 @@ func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { // Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { + blob, err := t.retrieve(item) + if err != nil { + return nil, err + } + if t.noCompression { + return blob, nil + } + return snappy.Decode(nil, blob) +} + +// retrieve looks up the data offset of an item with the given number and retrieves +// the raw binary blob from the data file. OBS! This method does not decode +// compressed data. +func (t *freezerTable) retrieve(item uint64) ([]byte, error) { t.lock.RLock() + defer t.lock.RUnlock() // Ensure the table and the item is accessible if t.index == nil || t.head == nil { - t.lock.RUnlock() return nil, errClosed } if atomic.LoadUint64(&t.items) <= item { - t.lock.RUnlock() return nil, errOutOfBounds } // Ensure the item was not deleted from the tail either if uint64(t.itemOffset) > item { - t.lock.RUnlock() return nil, errOutOfBounds } startOffset, endOffset, filenum, err := t.getBounds(item - uint64(t.itemOffset)) if err != nil { - t.lock.RUnlock() return nil, err } dataFile, exist := t.files[filenum] if !exist { - t.lock.RUnlock() return nil, fmt.Errorf("missing data file %d", filenum) } // Retrieve the data itself, decompress and return blob := make([]byte, endOffset-startOffset) if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil { - t.lock.RUnlock() return nil, err } - t.lock.RUnlock() t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize)) - - if t.noCompression { - return blob, nil - } - return snappy.Decode(nil, blob) + return blob, nil } // has returns an indicator whether the specified number data diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index b8d3170c6235..0df28f236d2d 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -18,10 +18,13 @@ package rawdb import ( "bytes" + "encoding/binary" "fmt" + "io/ioutil" "math/rand" "os" "path/filepath" + "sync" "testing" "time" @@ -637,6 +640,55 @@ func TestOffset(t *testing.T) { // 1. have data files d0, d1, d2, d3 // 2. remove d2,d3 // -// However, all 'normal' failure modes arising due to failing to sync() or save a file should be -// handled already, and the case described above can only (?) happen if an external process/user -// deletes files from the filesystem. +// However, all 'normal' failure modes arising due to failing to sync() or save a file +// should be handled already, and the case described above can only (?) happen if an +// external process/user deletes files from the filesystem. + +// TestAppendTruncateParallel is a test to check if the Append/truncate operations are +// racy. +// +// The reason why it's not a regular fuzzer, within tests/fuzzers, is that it is dependent +// on timing rather than 'clever' input -- there's no determinism. +func TestAppendTruncateParallel(t *testing.T) { + dir, err := ioutil.TempDir("", "freezer") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + f, err := newCustomTable(dir, "tmp", metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, 8, true) + if err != nil { + t.Fatal(err) + } + + fill := func(mark uint64) []byte { + data := make([]byte, 8) + binary.LittleEndian.PutUint64(data, mark) + return data + } + + for i := 0; i < 5000; i++ { + f.truncate(0) + data0 := fill(0) + f.Append(0, data0) + data1 := fill(1) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + f.truncate(0) + wg.Done() + }() + go func() { + f.Append(1, data1) + wg.Done() + }() + wg.Wait() + + if have, err := f.Retrieve(0); err == nil { + if !bytes.Equal(have, data0) { + t.Fatalf("have %x want %x", have, data0) + } + } + } +}