diff --git a/batch.go b/batch.go index 600d138e..f8486ba0 100644 --- a/batch.go +++ b/batch.go @@ -3,10 +3,11 @@ package rosedb import ( "bytes" "fmt" - "github.com/rosedblabs/rosedb/v2/utils" "sync" "time" + "github.com/rosedblabs/rosedb/v2/utils" + "github.com/bwmarrin/snowflake" "github.com/valyala/bytebufferpool" ) @@ -23,7 +24,7 @@ import ( // // You must call Commit method to commit the batch, otherwise the DB will be locked. type Batch struct { - db *DB + db *rose pendingWrites []*LogRecord // save the data to be written pendingWritesMap map[uint64][]int // map record hash key to index, fast lookup to pendingWrites options BatchOptions @@ -35,7 +36,7 @@ type Batch struct { } // NewBatch creates a new Batch instance. -func (db *DB) NewBatch(options BatchOptions) *Batch { +func (db *rose) NewBatch(options BatchOptions) *Batch { batch := &Batch{ db: db, options: options, @@ -68,7 +69,7 @@ func newRecord() interface{} { return &LogRecord{} } -func (b *Batch) init(rdonly, sync bool, db *DB) *Batch { +func (b *Batch) init(rdonly, sync bool, db *rose) *Batch { b.options.ReadOnly = rdonly b.options.Sync = sync b.db = db diff --git a/batch_test.go b/batch_test.go index 482f926a..7db406d8 100644 --- a/batch_test.go +++ b/batch_test.go @@ -1,17 +1,16 @@ package rosedb import ( - "os" "testing" "github.com/rosedblabs/rosedb/v2/utils" "github.com/stretchr/testify/assert" ) -func destroyDB(db *DB) { +func destroyDB(db DB) { _ = db.Close() - _ = os.RemoveAll(db.options.DirPath) - _ = os.RemoveAll(mergeDirPath(db.options.DirPath)) + _ = db.Fs().RemoveAll(db.(*rose).options.DirPath) + _ = db.Fs().RemoveAll(mergeDirPath(db.(*rose).options.DirPath)) } func TestBatch_Put_Normal(t *testing.T) { @@ -140,7 +139,7 @@ func TestBatch_Exist_Normal(t *testing.T) { assertKeyExistOrNot(t, db2, utils.GetTestKey(99), true) } -func generateData(t *testing.T, db *DB, start, end int, valueLen int) { +func generateData(t *testing.T, db DB, start, end int, valueLen int) { for ; start < end; start++ { err := db.Put(utils.GetTestKey(start), utils.RandomValue(valueLen)) assert.Nil(t, err) @@ -183,7 +182,7 @@ func batchPutAndIterate(t *testing.T, segmentSize int64, size int, valueLen int) } } -func assertKeyExistOrNot(t *testing.T, db *DB, key []byte, exist bool) { +func assertKeyExistOrNot(t *testing.T, db DB, key []byte, exist bool) { val, err := db.Get(key) if exist { assert.Nil(t, err) diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index 1653f076..3e426013 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -3,7 +3,6 @@ package benchmark import ( "errors" "math/rand" - "os" "testing" "github.com/rosedblabs/rosedb/v2" @@ -11,7 +10,7 @@ import ( "github.com/stretchr/testify/assert" ) -var db *rosedb.DB +var db rosedb.DB func openDB() func() { options := rosedb.DefaultOptions @@ -25,7 +24,7 @@ func openDB() func() { return func() { _ = db.Close() - _ = os.RemoveAll(options.DirPath) + _ = options.Fs.RemoveAll(options.DirPath) } } diff --git a/db.go b/db.go index 9db7c767..f43efbe9 100644 --- a/db.go +++ b/db.go @@ -6,17 +6,16 @@ import ( "fmt" "io" "os" - "path/filepath" "regexp" "sync" "time" "github.com/bwmarrin/snowflake" - "github.com/gofrs/flock" "github.com/robfig/cron/v3" "github.com/rosedblabs/rosedb/v2/index" "github.com/rosedblabs/rosedb/v2/utils" "github.com/rosedblabs/wal" + "github.com/spf13/afero" ) const ( @@ -26,7 +25,7 @@ const ( mergeFinNameSuffix = ".MERGEFIN" ) -// DB represents a ROSEDB database instance. +// rose represents a ROSEDB database instance. // It is built on the bitcask model, which is a log-structured storage. // It uses WAL to write data, and uses an in-memory index to store the key // and the position of the data in the WAL, @@ -39,12 +38,13 @@ const ( // our total data size is limited by the memory size. // // So if your memory can almost hold all the keys, ROSEDB is the perfect storage engine for you. -type DB struct { +type rose struct { + fs afero.Fs dataFiles *wal.WAL // data files are a sets of segment files in WAL. hintFile *wal.WAL // hint file is used to store the key and the position for fast startup. index index.Indexer options Options - fileLock *flock.Flock + fileLock Lock mu sync.RWMutex closed bool mergeRunning uint32 // indicate if the database is merging @@ -73,36 +73,53 @@ type Stat struct { // // It will open the wal files in the database directory and load the index from them. // Return the DB instance, or an error if any. -func Open(options Options) (*DB, error) { + +func OpenReadOnly(options Options) (DBReadOnly, error) { + return open(options, false) +} +func Open(options Options) (DB, error) { + return open(options, true) +} +func open(options Options, writeEnable bool) (*rose, error) { // check options if err := checkOptions(options); err != nil { return nil, err } + fs := options.Fs + // create data directory if not exist - if _, err := os.Stat(options.DirPath); err != nil { - if err := os.MkdirAll(options.DirPath, os.ModePerm); err != nil { + if _, err := fs.Stat(options.DirPath); err != nil { + if err := fs.MkdirAll(options.DirPath, os.ModePerm); err != nil { return nil, err } } // create file lock, prevent multiple processes from using the same database directory - fileLock := flock.New(filepath.Join(options.DirPath, fileLockName)) - hold, err := fileLock.TryLock() - if err != nil { - return nil, err - } - if !hold { - return nil, ErrDatabaseIsUsing + // fileLock := flock.New(filepath.Join(options.DirPath, fileLockName)) + + var err error + var fileLock Lock = nil + + if writeEnable { + fileLock = options.Lock + hold, err := fileLock.TryLock() + if err != nil { + return nil, err + } + if !hold { + return nil, ErrDatabaseIsUsing + } } // load merge files if exists - if err = loadMergeFiles(options.DirPath); err != nil { + if err = loadMergeFiles(options.DirPath, fs); err != nil { return nil, err } // init DB instance - db := &DB{ + db := &rose{ + fs: fs, index: index.NewIndexer(), options: options, fileLock: fileLock, @@ -151,9 +168,13 @@ func Open(options Options) (*DB, error) { return db, nil } -func (db *DB) openWalFiles() (*wal.WAL, error) { +func (db *rose) Fs() afero.Fs { + return db.fs +} +func (db *rose) openWalFiles() (*wal.WAL, error) { // open data files from WAL walFiles, err := wal.Open(wal.Options{ + Fs: db.options.Fs, DirPath: db.options.DirPath, SegmentSize: db.options.SegmentSize, SegmentFileExt: dataFileNameSuffix, @@ -167,7 +188,7 @@ func (db *DB) openWalFiles() (*wal.WAL, error) { return walFiles, nil } -func (db *DB) loadIndex() error { +func (db *rose) loadIndex() error { // load index frm hint file if err := db.loadIndexFromHintFile(); err != nil { return err @@ -182,7 +203,7 @@ func (db *DB) loadIndex() error { // Close the database, close all data files and release file lock. // Set the closed flag to true. // The DB instance cannot be used after closing. -func (db *DB) Close() error { +func (db *rose) Close() error { db.mu.Lock() defer db.mu.Unlock() @@ -191,8 +212,10 @@ func (db *DB) Close() error { } // release file lock - if err := db.fileLock.Unlock(); err != nil { - return err + if db.fileLock != nil { + if err := db.fileLock.Unlock(); err != nil { + return err + } } // close watch channel @@ -210,7 +233,7 @@ func (db *DB) Close() error { } // closeFiles close all data files and hint file -func (db *DB) closeFiles() error { +func (db *rose) closeFiles() error { // close wal if err := db.dataFiles.Close(); err != nil { return err @@ -225,7 +248,7 @@ func (db *DB) closeFiles() error { } // Sync all data files to the underlying storage. -func (db *DB) Sync() error { +func (db *rose) Sync() error { db.mu.Lock() defer db.mu.Unlock() @@ -233,7 +256,7 @@ func (db *DB) Sync() error { } // Stat returns the statistics of the database. -func (db *DB) Stat() *Stat { +func (db *rose) Stat() *Stat { db.mu.Lock() defer db.mu.Unlock() @@ -251,7 +274,7 @@ func (db *DB) Stat() *Stat { // Put a key-value pair into the database. // Actually, it will open a new batch and commit it. // You can think the batch has only one Put operation. -func (db *DB) Put(key []byte, value []byte) error { +func (db *rose) Put(key []byte, value []byte) error { batch := db.batchPool.Get().(*Batch) defer func() { batch.reset() @@ -271,7 +294,7 @@ func (db *DB) Put(key []byte, value []byte) error { // PutWithTTL a key-value pair into the database, with a ttl. // Actually, it will open a new batch and commit it. // You can think the batch has only one PutWithTTL operation. -func (db *DB) PutWithTTL(key []byte, value []byte, ttl time.Duration) error { +func (db *rose) PutWithTTL(key []byte, value []byte, ttl time.Duration) error { batch := db.batchPool.Get().(*Batch) defer func() { batch.reset() @@ -291,7 +314,7 @@ func (db *DB) PutWithTTL(key []byte, value []byte, ttl time.Duration) error { // Get the value of the specified key from the database. // Actually, it will open a new batch and commit it. // You can think the batch has only one Get operation. -func (db *DB) Get(key []byte) ([]byte, error) { +func (db *rose) Get(key []byte) ([]byte, error) { batch := db.batchPool.Get().(*Batch) batch.init(true, false, db) defer func() { @@ -305,7 +328,7 @@ func (db *DB) Get(key []byte) ([]byte, error) { // Delete the specified key from the database. // Actually, it will open a new batch and commit it. // You can think the batch has only one Delete operation. -func (db *DB) Delete(key []byte) error { +func (db *rose) Delete(key []byte) error { batch := db.batchPool.Get().(*Batch) defer func() { batch.reset() @@ -325,7 +348,7 @@ func (db *DB) Delete(key []byte) error { // Exist checks if the specified key exists in the database. // Actually, it will open a new batch and commit it. // You can think the batch has only one Exist operation. -func (db *DB) Exist(key []byte) (bool, error) { +func (db *rose) Exist(key []byte) (bool, error) { batch := db.batchPool.Get().(*Batch) batch.init(true, false, db) defer func() { @@ -337,7 +360,7 @@ func (db *DB) Exist(key []byte) (bool, error) { } // Expire sets the ttl of the key. -func (db *DB) Expire(key []byte, ttl time.Duration) error { +func (db *rose) Expire(key []byte, ttl time.Duration) error { batch := db.batchPool.Get().(*Batch) defer func() { batch.reset() @@ -355,7 +378,7 @@ func (db *DB) Expire(key []byte, ttl time.Duration) error { } // TTL get the ttl of the key. -func (db *DB) TTL(key []byte) (time.Duration, error) { +func (db *rose) TTL(key []byte) (time.Duration, error) { batch := db.batchPool.Get().(*Batch) batch.init(true, false, db) defer func() { @@ -368,7 +391,7 @@ func (db *DB) TTL(key []byte) (time.Duration, error) { // Persist removes the ttl of the key. // If the key does not exist or expired, it will return ErrKeyNotFound. -func (db *DB) Persist(key []byte) error { +func (db *rose) Persist(key []byte) error { batch := db.batchPool.Get().(*Batch) defer func() { batch.reset() @@ -385,7 +408,7 @@ func (db *DB) Persist(key []byte) error { return batch.Commit() } -func (db *DB) Watch() (<-chan *Event, error) { +func (db *rose) Watch() (<-chan *Event, error) { if db.options.WatchQueueSize <= 0 { return nil, ErrWatchDisabled } @@ -393,7 +416,7 @@ func (db *DB) Watch() (<-chan *Event, error) { } // Ascend calls handleFn for each key/value pair in the db in ascending order. -func (db *DB) Ascend(handleFn func(k []byte, v []byte) (bool, error)) { +func (db *rose) Ascend(handleFn func(k []byte, v []byte) (bool, error)) { db.mu.RLock() defer db.mu.RUnlock() @@ -410,7 +433,7 @@ func (db *DB) Ascend(handleFn func(k []byte, v []byte) (bool, error)) { } // AscendRange calls handleFn for each key/value pair in the db within the range [startKey, endKey] in ascending order. -func (db *DB) AscendRange(startKey, endKey []byte, handleFn func(k []byte, v []byte) (bool, error)) { +func (db *rose) AscendRange(startKey, endKey []byte, handleFn func(k []byte, v []byte) (bool, error)) { db.mu.RLock() defer db.mu.RUnlock() @@ -427,7 +450,7 @@ func (db *DB) AscendRange(startKey, endKey []byte, handleFn func(k []byte, v []b } // AscendGreaterOrEqual calls handleFn for each key/value pair in the db with keys greater than or equal to the given key. -func (db *DB) AscendGreaterOrEqual(key []byte, handleFn func(k []byte, v []byte) (bool, error)) { +func (db *rose) AscendGreaterOrEqual(key []byte, handleFn func(k []byte, v []byte) (bool, error)) { db.mu.RLock() defer db.mu.RUnlock() @@ -447,7 +470,7 @@ func (db *DB) AscendGreaterOrEqual(key []byte, handleFn func(k []byte, v []byte) // Since our expiry time is stored in the value, if you want to filter expired keys, // you need to set parameter filterExpired to true. But the performance will be affected. // Because we need to read the value of each key to determine if it is expired. -func (db *DB) AscendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) { +func (db *rose) AscendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) { db.mu.RLock() defer db.mu.RUnlock() @@ -478,7 +501,7 @@ func (db *DB) AscendKeys(pattern []byte, filterExpired bool, handleFn func(k []b } // Descend calls handleFn for each key/value pair in the db in descending order. -func (db *DB) Descend(handleFn func(k []byte, v []byte) (bool, error)) { +func (db *rose) Descend(handleFn func(k []byte, v []byte) (bool, error)) { db.mu.RLock() defer db.mu.RUnlock() @@ -495,7 +518,7 @@ func (db *DB) Descend(handleFn func(k []byte, v []byte) (bool, error)) { } // DescendRange calls handleFn for each key/value pair in the db within the range [startKey, endKey] in descending order. -func (db *DB) DescendRange(startKey, endKey []byte, handleFn func(k []byte, v []byte) (bool, error)) { +func (db *rose) DescendRange(startKey, endKey []byte, handleFn func(k []byte, v []byte) (bool, error)) { db.mu.RLock() defer db.mu.RUnlock() @@ -512,7 +535,7 @@ func (db *DB) DescendRange(startKey, endKey []byte, handleFn func(k []byte, v [] } // DescendLessOrEqual calls handleFn for each key/value pair in the db with keys less than or equal to the given key. -func (db *DB) DescendLessOrEqual(key []byte, handleFn func(k []byte, v []byte) (bool, error)) { +func (db *rose) DescendLessOrEqual(key []byte, handleFn func(k []byte, v []byte) (bool, error)) { db.mu.RLock() defer db.mu.RUnlock() @@ -532,7 +555,7 @@ func (db *DB) DescendLessOrEqual(key []byte, handleFn func(k []byte, v []byte) ( // Since our expiry time is stored in the value, if you want to filter expired keys, // you need to set parameter filterExpired to true. But the performance will be affected. // Because we need to read the value of each key to determine if it is expired. -func (db *DB) DescendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) { +func (db *rose) DescendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) { db.mu.RLock() defer db.mu.RUnlock() @@ -562,7 +585,7 @@ func (db *DB) DescendKeys(pattern []byte, filterExpired bool, handleFn func(k [] }) } -func (db *DB) checkValue(chunk []byte) []byte { +func (db *rose) checkValue(chunk []byte) []byte { record := decodeLogRecord(chunk) now := time.Now().UnixNano() if record.Type != LogRecordDeleted && !record.IsExpired(now) { @@ -592,8 +615,8 @@ func checkOptions(options Options) error { // loadIndexFromWAL loads index from WAL. // It will iterate over all the WAL files and read data // from them to rebuild the index. -func (db *DB) loadIndexFromWAL() error { - mergeFinSegmentId, err := getMergeFinSegmentId(db.options.DirPath) +func (db *rose) loadIndexFromWAL() error { + mergeFinSegmentId, err := getMergeFinSegmentId(db.options.DirPath, db.fs) if err != nil { return err } @@ -663,7 +686,7 @@ func (db *DB) loadIndexFromWAL() error { // DeleteExpiredKeys scan the entire index in ascending order to delete expired keys. // It is a time-consuming operation, so we need to specify a timeout // to prevent the DB from being unavailable for a long time. -func (db *DB) DeleteExpiredKeys(timeout time.Duration) error { +func (db *rose) DeleteExpiredKeys(timeout time.Duration) error { // set timeout ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() diff --git a/db_test.go b/db_test.go index ac017352..38e0a6bd 100644 --- a/db_test.go +++ b/db_test.go @@ -108,7 +108,7 @@ func TestDB_Concurrent_Put(t *testing.T) { count++ return true }) - assert.Equal(t, count, db.index.Size()) + assert.Equal(t, count, db.(*rose).index.Size()) } func TestDB_Ascend(t *testing.T) { @@ -770,7 +770,7 @@ func TestDB_Auto_Merge(t *testing.T) { } { - reader := db.dataFiles.NewReader() + reader := db.(*rose).dataFiles.NewReader() var keyCnt int for { if _, _, err := reader.Next(); errors.Is(err, io.EOF) { @@ -784,7 +784,7 @@ func TestDB_Auto_Merge(t *testing.T) { } mergeDirPath := mergeDirPath(options.DirPath) - if _, err := os.Stat(mergeDirPath); err != nil { + if _, err := db.Fs().Stat(mergeDirPath); err != nil { assert.True(t, os.IsNotExist(err)) } assert.NoError(t, db.Close()) @@ -795,7 +795,7 @@ func TestDB_Auto_Merge(t *testing.T) { assert.Nil(t, err) { <-time.After(time.Second * 2) - reader := db.dataFiles.NewReader() + reader := db.(*rose).dataFiles.NewReader() var keyCnt int for { if _, _, err := reader.Next(); errors.Is(err, io.EOF) { diff --git a/examples/basic/main.go b/examples/basic/main.go index a054621d..de5c6493 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -1,7 +1,6 @@ package main import ( - "os" "runtime" "github.com/rosedblabs/rosedb/v2" @@ -22,7 +21,7 @@ func main() { //remove data dir, for test, there's no need to keep any file or directory on disk defer func() { - _ = os.RemoveAll(options.DirPath) + _ = options.Fs.RemoveAll(options.DirPath) }() // open a database diff --git a/go.mod b/go.mod index c4b03360..a79b6140 100644 --- a/go.mod +++ b/go.mod @@ -2,18 +2,23 @@ module github.com/rosedblabs/rosedb/v2 go 1.19 +replace github.com/rosedblabs/wal => github.com/izouxv/wal v0.0.9 + require ( github.com/google/btree v1.1.2 + github.com/robfig/cron/v3 v3.0.0 github.com/rosedblabs/wal v1.3.6-0.20230924022528-3202245af020 + github.com/spf13/afero v1.11.0 github.com/valyala/bytebufferpool v1.0.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/robfig/cron/v3 v3.0.0 // indirect - golang.org/x/sys v0.11.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) @@ -21,6 +26,5 @@ require ( require ( github.com/bwmarrin/snowflake v0.3.0 github.com/gofrs/flock v0.8.1 - github.com/hashicorp/golang-lru/v2 v2.0.4 // indirect - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 ) diff --git a/go.sum b/go.sum index 83493971..797bfa8e 100644 --- a/go.sum +++ b/go.sum @@ -7,8 +7,10 @@ github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= -github.com/hashicorp/golang-lru/v2 v2.0.4 h1:7GHuZcgid37q8o5i3QI9KMT4nCWQQ3Kx3Ov6bb9MfK0= -github.com/hashicorp/golang-lru/v2 v2.0.4/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/hashicorp/golang-lru/v2 v2.0.2 h1:Dwmkdr5Nc/oBiXgJS3CDHNhJtIHkuZ3DZF5twqnfBdU= +github.com/hashicorp/golang-lru/v2 v2.0.2/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/izouxv/wal v0.0.9 h1:bY8PWUHnbOHTGelNFRyCIDKBusxDlVrJvNInt98CPrg= +github.com/izouxv/wal v0.0.9/go.mod h1:CjUS414BL0AJwQsnTkvU5QrlHf1OkzLC97Q2QCwNRqg= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -19,14 +21,16 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rosedblabs/wal v1.3.6-0.20230924022528-3202245af020 h1:EA8XGCVg1FDM6Dh4MP4sTsmH3gvjhRtp/N+lbnBwtJE= -github.com/rosedblabs/wal v1.3.6-0.20230924022528-3202245af020/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= +github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/merge.go b/merge.go index 85c72712..38d251e2 100644 --- a/merge.go +++ b/merge.go @@ -12,6 +12,7 @@ import ( "github.com/rosedblabs/rosedb/v2/index" "github.com/rosedblabs/wal" + "github.com/spf13/afero" "github.com/valyala/bytebufferpool" ) @@ -29,7 +30,7 @@ const ( // // If reopenAfterDone is true, the original file will be replaced by the merge file, // and db's index will be rebuilt after the merge completes. -func (db *DB) Merge(reopenAfterDone bool) error { +func (db *rose) Merge(reopenAfterDone bool) error { if err := db.doMerge(); err != nil { return err } @@ -44,7 +45,7 @@ func (db *DB) Merge(reopenAfterDone bool) error { _ = db.closeFiles() // replace original file - err := loadMergeFiles(db.options.DirPath) + err := loadMergeFiles(db.options.DirPath, db.fs) if err != nil { return err } @@ -64,7 +65,7 @@ func (db *DB) Merge(reopenAfterDone bool) error { return nil } -func (db *DB) doMerge() error { +func (db *rose) doMerge() error { db.mu.Lock() // check if the database is closed if db.closed { @@ -172,10 +173,10 @@ func (db *DB) doMerge() error { return nil } -func (db *DB) openMergeDB() (*DB, error) { +func (db *rose) openMergeDB() (*rose, error) { mergePath := mergeDirPath(db.options.DirPath) // delete the merge directory if it exists - if err := os.RemoveAll(mergePath); err != nil { + if err := db.fs.RemoveAll(mergePath); err != nil { return nil, err } options := db.options @@ -183,13 +184,15 @@ func (db *DB) openMergeDB() (*DB, error) { // because we can sync the data file manually after the merge operation is completed. options.Sync, options.BytesPerSync = false, 0 options.DirPath = mergePath - mergeDB, err := Open(options) + mergeDBI, err := Open(options) if err != nil { return nil, err } + mergeDB := mergeDBI.(*rose) // open the hint files to write the new position of the data. hintFile, err := wal.Open(wal.Options{ + Fs: db.options.Fs, DirPath: options.DirPath, // we don't need to rotate the hint file, just write all data to a single file. SegmentSize: math.MaxInt64, @@ -211,8 +214,9 @@ func mergeDirPath(dirPath string) string { return filepath.Join(dir, base+mergeDirSuffixName) } -func (db *DB) openMergeFinishedFile() (*wal.WAL, error) { +func (db *rose) openMergeFinishedFile() (*wal.WAL, error) { return wal.Open(wal.Options{ + Fs: db.options.Fs, DirPath: db.options.DirPath, SegmentSize: GB, SegmentFileExt: mergeFinNameSuffix, @@ -230,10 +234,10 @@ func positionEquals(a, b *wal.ChunkPosition) bool { // loadMergeFiles loads all the merge files, and copy the data to the original data directory. // If there is no merge files, or the merge operation is not completed, it will return nil. -func loadMergeFiles(dirPath string) error { +func loadMergeFiles(dirPath string, fs afero.Fs) error { // check if there is a merge directory mergeDirPath := mergeDirPath(dirPath) - if _, err := os.Stat(mergeDirPath); err != nil { + if _, err := fs.Stat(mergeDirPath); err != nil { // does not exist, just return. if os.IsNotExist(err) { return nil @@ -243,12 +247,12 @@ func loadMergeFiles(dirPath string) error { // remove the merge directory at last defer func() { - _ = os.RemoveAll(mergeDirPath) + _ = fs.RemoveAll(mergeDirPath) }() copyFile := func(suffix string, fileId uint32, force bool) { srcFile := wal.SegmentFileName(mergeDirPath, suffix, fileId) - stat, err := os.Stat(srcFile) + stat, err := fs.Stat(srcFile) if os.IsNotExist(err) { return } @@ -259,11 +263,11 @@ func loadMergeFiles(dirPath string) error { return } destFile := wal.SegmentFileName(dirPath, suffix, fileId) - _ = os.Rename(srcFile, destFile) + _ = fs.Rename(srcFile, destFile) } // get the merge finished segment id - mergeFinSegmentId, err := getMergeFinSegmentId(mergeDirPath) + mergeFinSegmentId, err := getMergeFinSegmentId(mergeDirPath, fs) if err != nil { return err } @@ -281,8 +285,8 @@ func loadMergeFiles(dirPath string) error { // } // remove the original data file - if _, err = os.Stat(destFile); err == nil { - if err = os.Remove(destFile); err != nil { + if _, err = fs.Stat(destFile); err == nil { + if err = fs.Remove(destFile); err != nil { return err } } @@ -299,9 +303,9 @@ func loadMergeFiles(dirPath string) error { return nil } -func getMergeFinSegmentId(mergePath string) (wal.SegmentID, error) { +func getMergeFinSegmentId(mergePath string, fs afero.Fs) (wal.SegmentID, error) { // check if the merge operation is completed - mergeFinFile, err := os.Open(wal.SegmentFileName(mergePath, mergeFinNameSuffix, 1)) + mergeFinFile, err := fs.Open(wal.SegmentFileName(mergePath, mergeFinNameSuffix, 1)) if err != nil { // if the merge finished file does not exist, it means that the merge operation is not completed. // so we should remove the merge directory and return nil. @@ -321,8 +325,9 @@ func getMergeFinSegmentId(mergePath string) (wal.SegmentID, error) { return mergeFinSegmentId, nil } -func (db *DB) loadIndexFromHintFile() error { +func (db *rose) loadIndexFromHintFile() error { hintFile, err := wal.Open(wal.Options{ + Fs: db.options.Fs, DirPath: db.options.DirPath, // we don't need to rotate the hint file, just write all data to the same file. SegmentSize: math.MaxInt64, diff --git a/merge_test.go b/merge_test.go index 77714ef1..e51d8d1b 100644 --- a/merge_test.go +++ b/merge_test.go @@ -230,7 +230,7 @@ func TestDB_Multi_Open_Merge(t *testing.T) { assert.Nil(t, err) assert.Equal(t, value, v) } - assert.Equal(t, len(kvs), db.index.Size()) + assert.Equal(t, len(kvs), db.(*rose).index.Size()) } func TestDB_Merge_ReopenAfterDone(t *testing.T) { @@ -250,7 +250,7 @@ func TestDB_Merge_ReopenAfterDone(t *testing.T) { err = db.Merge(true) assert.Nil(t, err) - _, err = os.Stat(mergeDirPath(options.DirPath)) + _, err = db.Fs().Stat(mergeDirPath(options.DirPath)) assert.Equal(t, true, os.IsNotExist(err)) for key, value := range kvs { @@ -258,7 +258,7 @@ func TestDB_Merge_ReopenAfterDone(t *testing.T) { assert.Nil(t, err) assert.Equal(t, value, v) } - assert.Equal(t, len(kvs), db.index.Size()) + assert.Equal(t, len(kvs), db.(*rose).index.Size()) } func TestDB_Merge_Concurrent_Put(t *testing.T) { @@ -289,7 +289,7 @@ func TestDB_Merge_Concurrent_Put(t *testing.T) { }() wg.Wait() - _, err = os.Stat(mergeDirPath(options.DirPath)) + _, err = db.Fs().Stat(mergeDirPath(options.DirPath)) assert.Equal(t, true, os.IsNotExist(err)) var count int @@ -300,6 +300,6 @@ func TestDB_Merge_Concurrent_Put(t *testing.T) { count++ return true }) - assert.Equal(t, count, db.index.Size()) + assert.Equal(t, count, db.(*rose).index.Size()) } diff --git a/options.go b/options.go index e8be46e3..8fa75de7 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,12 @@ package rosedb -import "os" +import ( + "os" + "path/filepath" + + "github.com/gofrs/flock" + "github.com/spf13/afero" +) // Options specifies the options for opening a database. type Options struct { @@ -42,6 +48,10 @@ type Options struct { // do not set this shecule too frequently, it will affect the performance. // refer to https://en.wikipedia.org/wiki/Cron AutoMergeCronExpr string + + Fs afero.Fs + + Lock Lock } // BatchOptions specifies the options for creating a batch. @@ -67,6 +77,8 @@ var DefaultOptions = Options{ BytesPerSync: 0, WatchQueueSize: 0, AutoMergeCronExpr: "", + Fs: afero.NewOsFs(), + Lock: flock.New(filepath.Join(tempDBDir(), fileLockName)), } var DefaultBatchOptions = BatchOptions{ diff --git a/rosedb.go b/rosedb.go new file mode 100644 index 00000000..353deb9e --- /dev/null +++ b/rosedb.go @@ -0,0 +1,44 @@ +package rosedb + +import ( + "time" + + "github.com/spf13/afero" +) + +type DBReadOnly interface { + Close() error + Fs() afero.Fs + Stat() *Stat + Get(key []byte) ([]byte, error) + Exist(key []byte) (bool, error) + Ascend(handleFn func(k []byte, v []byte) (bool, error)) + AscendRange(startKey, endKey []byte, handleFn func(k []byte, v []byte) (bool, error)) + AscendGreaterOrEqual(key []byte, handleFn func(k []byte, v []byte) (bool, error)) + AscendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) + Descend(handleFn func(k []byte, v []byte) (bool, error)) + DescendRange(startKey, endKey []byte, handleFn func(k []byte, v []byte) (bool, error)) + DescendLessOrEqual(key []byte, handleFn func(k []byte, v []byte) (bool, error)) + DescendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) +} +type DB interface { + DBReadOnly + Sync() error + Watch() (<-chan *Event, error) + Put(key []byte, value []byte) error + PutWithTTL(key []byte, value []byte, ttl time.Duration) error + Delete(key []byte) error + Expire(key []byte, ttl time.Duration) error + TTL(key []byte) (time.Duration, error) + Persist(key []byte) error + DeleteExpiredKeys(timeout time.Duration) error + + NewBatch(options BatchOptions) *Batch + + Merge(reopenAfterDone bool) error +} + +type Lock interface { + TryLock() (bool, error) + Unlock() error +}