diff --git a/db.go b/db.go index c3550100..f69d99f0 100644 --- a/db.go +++ b/db.go @@ -600,6 +600,7 @@ func (db *DB) loadIndexFromWAL() error { now := time.Now().UnixNano() // get a reader for WAL reader := db.dataFiles.NewReader() + db.dataFiles.SetIsStartupTraversal(true) for { // if the current segment id is less than the mergeFinSegmentId, // we can skip this segment because it has been merged, @@ -656,6 +657,7 @@ func (db *DB) loadIndexFromWAL() error { }) } } + db.dataFiles.SetIsStartupTraversal(false) return nil } diff --git a/db_test.go b/db_test.go index ac017352..841de605 100644 --- a/db_test.go +++ b/db_test.go @@ -111,6 +111,36 @@ func TestDB_Concurrent_Put(t *testing.T) { assert.Equal(t, count, db.index.Size()) } +func TestDB_Concurrent_Get(t *testing.T) { + options := DefaultOptions + db, err := Open(options) + assert.Nil(t, err) + defer destroyDB(db) + + for i := 0; i < 10000; i++ { + err = db.Put(utils.GetTestKey(i), utils.RandomValue(128)) + assert.Nil(t, err) + } + for i := 10000; i < 20000; i++ { + err = db.Put(utils.GetTestKey(i), utils.RandomValue(4096)) + assert.Nil(t, err) + } + + var wg sync.WaitGroup + wg.Add(50) + for i := 0; i < 50; i++ { + go func() { + defer wg.Done() + db.Ascend(func(key []byte, value []byte) (bool, error) { + assert.NotNil(t, key) + assert.NotNil(t, value) + return true, nil + }) + }() + } + wg.Wait() +} + func TestDB_Ascend(t *testing.T) { // Create a test database instance options := DefaultOptions @@ -791,11 +821,11 @@ func TestDB_Auto_Merge(t *testing.T) { { options.AutoMergeCronExpr = "* * * * * *" // every second - db, err := Open(options) + db2, err := Open(options) assert.Nil(t, err) { <-time.After(time.Second * 2) - reader := db.dataFiles.NewReader() + reader := db2.dataFiles.NewReader() var keyCnt int for { if _, _, err := reader.Next(); errors.Is(err, io.EOF) { @@ -806,6 +836,6 @@ func TestDB_Auto_Merge(t *testing.T) { // after merge records are only valid data, so totally is 2000 assert.Equal(t, 2000, keyCnt) } - destroyDB(db) + _ = db2.Close() } } diff --git a/go.mod b/go.mod index 2d2c8674..30d3d3dd 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.21 require ( github.com/google/btree v1.1.2 - github.com/rosedblabs/wal v1.3.7 + github.com/rosedblabs/wal v1.3.8 github.com/valyala/bytebufferpool v1.0.0 ) diff --git a/go.sum b/go.sum index 481a047d..1a853933 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rosedblabs/wal v1.3.7 h1:ZB/xczf+/fEwbjbPnC/A6DLZRx0rxKgtQsWw2+SxKDg= -github.com/rosedblabs/wal v1.3.7/go.mod h1:DFvhrmTTeiXvn2btXXT2MW9Nvu99PU0g/pKGgh0+T+o= +github.com/rosedblabs/wal v1.3.8 h1:tErpD9JT/ICiyV3mv5l7qUH6lybn5XF1TbI0e8kvH8M= +github.com/rosedblabs/wal v1.3.8/go.mod h1:DFvhrmTTeiXvn2btXXT2MW9Nvu99PU0g/pKGgh0+T+o= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= diff --git a/merge.go b/merge.go index 90031032..7fb0dace 100644 --- a/merge.go +++ b/merge.go @@ -335,6 +335,7 @@ func (db *DB) loadIndexFromHintFile() error { // read all the hint records from the hint file reader := hintFile.NewReader() + hintFile.SetIsStartupTraversal(true) for { chunk, _, err := reader.Next() if err != nil { @@ -349,5 +350,6 @@ func (db *DB) loadIndexFromHintFile() error { // So just put them into the index without checking. db.index.Put(key, position) } + hintFile.SetIsStartupTraversal(false) return nil }