diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 38c4ec2..fbf5769 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,7 +24,7 @@ jobs: - uses: actions/checkout@v2 - name: test & coverage report creation run: | - go test ./... -mod=readonly -timeout 8m -race -coverprofile=coverage.txt -covermode=atomic -tags=memdb,goleveldb,cleveldb,boltdb,rocksdb,badgerdb -v + CGO_LDFLAGS=-lrocksdb go test ./... -mod=readonly -timeout 8m -race -coverprofile=coverage.txt -covermode=atomic -tags=memdb,goleveldb,cleveldb,boltdb,rocksdb,badgerdb -v - uses: codecov/codecov-action@v1 with: file: ./coverage.txt diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c56ab5..45b3cae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * (cleveldb/rocksdb) [\#3](https://github.com/line/tm-db/pull/3) Make path for cleveldb, rocksdb * (prefix) [\#10](https://github.com/line/tm-db/pull/10) Prefix iterator (#10) * (api) [\#15](https://github.com/line/tm-db/pull/15) Add AvailableDBBackends function (#15) +* (rdb) [\#34](https://github.com/line/tm-db/pull/34) Name & WriteLowPri methods (#34) ### Improvements * (global) [\#1](https://github.com/line/tm-db/pull/1) Revise module path diff --git a/badgerdb/batch.go b/badgerdb/batch.go index 1fd186f..dd8bc13 100644 --- a/badgerdb/batch.go +++ b/badgerdb/batch.go @@ -62,6 +62,10 @@ func (b *badgerDBBatch) WriteSync() error { return withSync(b.db, b.Write()) } +func (b *badgerDBBatch) WriteLowPri() error { + return b.Write() +} + func (b *badgerDBBatch) Close() error { select { case <-b.firstFlush: // a Flush after Cancel panics too diff --git a/badgerdb/db.go b/badgerdb/db.go index 092fcac..d0f6520 100644 --- a/badgerdb/db.go +++ b/badgerdb/db.go @@ -1,6 +1,7 @@ package badgerdb import ( + "path" "path/filepath" "github.com/dgraph-io/badger/v2" @@ -9,7 +10,8 @@ import ( ) type BadgerDB struct { - db *badger.DB + name string + db *badger.DB } var _ tmdb.DB = (*BadgerDB)(nil) @@ -38,7 +40,11 @@ func NewDBWithOptions(opts badger.Options) (*BadgerDB, error) { if err != nil { return nil, err } - return &BadgerDB{db: db}, nil + return &BadgerDB{name: path.Base(opts.Dir), db: db}, nil +} + +func (b *BadgerDB) Name() string { + return b.name } func (b *BadgerDB) Get(key []byte) ([]byte, error) { diff --git a/boltdb/batch.go b/boltdb/batch.go index 6ae0ce8..d10cb22 100644 --- a/boltdb/batch.go +++ b/boltdb/batch.go @@ -93,6 +93,11 @@ func (b *boltDBBatch) WriteSync() error { return b.Write() } +// WriteLowPri implements Batch. +func (b *boltDBBatch) WriteLowPri() error { + return b.Write() +} + // Close implements Batch. func (b *boltDBBatch) Close() error { b.ops = nil diff --git a/boltdb/db.go b/boltdb/db.go index 4324e03..06e1ae6 100644 --- a/boltdb/db.go +++ b/boltdb/db.go @@ -21,7 +21,8 @@ var bucket = []byte("tm") // A single bucket ([]byte("tm")) is used per a database instance. This could // lead to performance issues when/if there will be lots of keys. type BoltDB struct { - db *bbolt.DB + name string + db *bbolt.DB } var _ tmdb.DB = (*BoltDB)(nil) @@ -53,7 +54,11 @@ func NewDBWithOpts(name string, dir string, opts *bbolt.Options) (tmdb.DB, error return nil, err } - return &BoltDB{db: db}, nil + return &BoltDB{name: name, db: db}, nil +} + +func (bdb *BoltDB) Name() string { + return bdb.name } // Get implements DB. diff --git a/cleveldb/batch.go b/cleveldb/batch.go index b4d8381..b7b4f00 100644 --- a/cleveldb/batch.go +++ b/cleveldb/batch.go @@ -74,6 +74,11 @@ func (b *cLevelDBBatch) WriteSync() error { return nil } +// WriteLowPri imelements Batch. +func (b *cLevelDBBatch) WriteLowPri() error { + return b.Write() +} + // Close implements Batch. func (b *cLevelDBBatch) Close() error { if b.batch != nil { diff --git a/cleveldb/db.go b/cleveldb/db.go index 246e094..19c2f46 100644 --- a/cleveldb/db.go +++ b/cleveldb/db.go @@ -11,6 +11,7 @@ import ( // CLevelDB uses the C LevelDB database via a Go wrapper. type CLevelDB struct { + name string db *levigo.DB ro *levigo.ReadOptions wo *levigo.WriteOptions @@ -42,6 +43,7 @@ func NewDB(name string, dir string) (*CLevelDB, error) { woSync := levigo.NewWriteOptions() woSync.SetSync(true) database := &CLevelDB{ + name: dbPath, db: db, ro: ro, wo: wo, @@ -50,6 +52,10 @@ func NewDB(name string, dir string) (*CLevelDB, error) { return database, nil } +func (db *CLevelDB) Name() string { + return db.name +} + // Get implements DB. func (db *CLevelDB) Get(key []byte) ([]byte, error) { if len(key) == 0 { diff --git a/contrib/get_cleveldb.sh b/contrib/get_cleveldb.sh new file mode 100755 index 0000000..5e78911 --- /dev/null +++ b/contrib/get_cleveldb.sh @@ -0,0 +1,10 @@ +set -e + +version="1.23" +leveldb="leveldb" +archive="${version}.tar.gz" + +rm -rf ${leveldb} ${leveldb}-${archive} +wget -O ${leveldb}-${archive} https://github.com/google/leveldb/archive/${archive} +tar -zxvf ${leveldb}-${archive} +mv ${leveldb}-${version} ${leveldb} diff --git a/contrib/get_rocksdb.sh b/contrib/get_rocksdb.sh new file mode 100755 index 0000000..26c58bf --- /dev/null +++ b/contrib/get_rocksdb.sh @@ -0,0 +1,11 @@ +set -e + +version="6.20.3" +rocksdb="rocksdb" +rocksdb_dir="rocksdb.build" +archive="v${version}.tar.gz" + +rm -rf ${rocksdb_dir} ${rocksdb}-${archive} +wget -O ${rocksdb}-${archive} https://github.com/facebook/rocksdb/archive/${archive} +tar -zxvf ${rocksdb}-${archive} +mv ${rocksdb}-${version} ${rocksdb_dir} diff --git a/goleveldb/batch.go b/goleveldb/batch.go index 2910911..70de247 100644 --- a/goleveldb/batch.go +++ b/goleveldb/batch.go @@ -57,6 +57,11 @@ func (b *goLevelDBBatch) WriteSync() error { return b.write(true) } +// WriteLowPri implements Batch. +func (b *goLevelDBBatch) WriteLowPri() error { + return b.Write() +} + func (b *goLevelDBBatch) write(sync bool) error { if b.batch == nil { return tmdb.ErrBatchClosed diff --git a/goleveldb/db.go b/goleveldb/db.go index 9c4171d..9778a38 100644 --- a/goleveldb/db.go +++ b/goleveldb/db.go @@ -12,7 +12,8 @@ import ( ) type GoLevelDB struct { - db *leveldb.DB + name string + db *leveldb.DB } var _ tmdb.DB = (*GoLevelDB)(nil) @@ -28,11 +29,16 @@ func NewDBWithOpts(name string, dir string, o *opt.Options) (*GoLevelDB, error) return nil, err } database := &GoLevelDB{ - db: db, + name: name, + db: db, } return database, nil } +func (db *GoLevelDB) Name() string { + return db.name +} + // Get implements DB. func (db *GoLevelDB) Get(key []byte) ([]byte, error) { if len(key) == 0 { diff --git a/makefile b/makefile index 8aa0ddc..3288c98 100644 --- a/makefile +++ b/makefile @@ -2,15 +2,21 @@ GOTOOLS = github.com/golangci/golangci-lint/cmd/golangci-lint PACKAGES=$(shell go list ./...) INCLUDE = -I=. -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf +CLEVELDB_DIR=$(shell pwd)/leveldb +ROCKSDB_DIR=$(shell pwd)/rocksdb.build +CGO_CFLAGS=-I$(CLEVELDB_DIR)/include -I$(ROCKSDB_DIR)/include +CGO_LDFLAGS=-L$(CLEVELDB_DIR)/build -L$(ROCKSDB_DIR) -lleveldb -lrocksdb -lm -lstdc++ $(shell awk '/PLATFORM_LDFLAGS/ {sub("PLATFORM_LDFLAGS=", ""); print}' < $(ROCKSDB_DIR)/make_config.mk) + export GO111MODULE = on all: lint test ### go tests ## By default this will only test memdb & goleveldb -test: +test: cleveldb rocksdb.build @echo "--> Running go test" - @go test $(PACKAGES) -tags memdb,goleveldb -v + @CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" \ + go test $(PACKAGES) -tags memdb,goleveldb -v test-memdb: @echo "--> Running go test" @@ -28,6 +34,10 @@ test-rocksdb: @echo "--> Running go test" @go test ./rocksdb/... -tags rocksdb -v +test-rdb: + @echo "--> Running go test" + @go test ./rdb/... -tags rocksdb -v + test-boltdb: @echo "--> Running go test" @go test ./boltdb/... -tags boltdb -v @@ -101,6 +111,25 @@ format: tools: go get -v $(GOTOOLS) +.PHONY: cleveldb rocksdb +cleveldb: + @if [ ! -e $(CLEVELDB_DIR) ]; then \ + sh contrib/get_cleveldb.sh; \ + fi + @if [ ! -e $(CLEVELDB_DIR)/libcleveldb.a ]; then \ + cd $(CLEVELDB_DIR); \ + cmake -S . -B build -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED_LIBS=OFF -DLEVELDB_BUILD_TESTS=OFF -DLEVELDB_BUILD_BENCHMARKS=OFF; \ + cmake --build build; \ + fi + +rocksdb.build: + @if [ ! -e $(ROCKSDB_DIR) ]; then \ + sh ./contrib/get_rocksdb.sh; \ + fi + @if [ ! -e $(ROCKSDB_DIR)/librocksdb.a ]; then \ + cd $(ROCKSDB_DIR) && make -j4 static_lib; \ + fi + # generates certificates for TLS testing in remotedb gen_certs: clean_certs certstrap init --common-name "tendermint.com" --passphrase "" diff --git a/memdb/batch.go b/memdb/batch.go index 865ab06..5217efc 100644 --- a/memdb/batch.go +++ b/memdb/batch.go @@ -91,6 +91,11 @@ func (b *memDBBatch) WriteSync() error { return b.Write() } +// WriteLowPri implements Batch. +func (b *memDBBatch) WriteLowPri() error { + return b.Write() +} + // Close implements Batch. func (b *memDBBatch) Close() error { b.ops = nil diff --git a/memdb/db.go b/memdb/db.go index 1dc6a83..3db31a8 100644 --- a/memdb/db.go +++ b/memdb/db.go @@ -59,6 +59,10 @@ func NewDB() *MemDB { return database } +func (db *MemDB) Name() string { + return "nameless-memdb" +} + // Get implements DB. func (db *MemDB) Get(key []byte) ([]byte, error) { if len(key) == 0 { diff --git a/metadb/db.go b/metadb/db.go index a413251..382f172 100644 --- a/metadb/db.go +++ b/metadb/db.go @@ -41,6 +41,8 @@ const ( // - EXPERIMENTAL // - use badgerdb build tag (go build -tags badgerdb) BadgerDBBackend BackendType = "badgerdb" + // Alternative rocksdb + RDBBackend BackendType = "stonesdb" ) type dbCreator func(name string, dir string) (tmdb.DB, error) diff --git a/metadb/db_rdb.go b/metadb/db_rdb.go new file mode 100644 index 0000000..6ad8474 --- /dev/null +++ b/metadb/db_rdb.go @@ -0,0 +1,14 @@ +// +build rocksdb + +package metadb + +import ( + tmdb "github.com/line/tm-db/v2" + "github.com/line/tm-db/v2/rdb" +) + +func rdbCreator(name, dir string) (tmdb.DB, error) { + return rdb.NewDB(name, dir) +} + +func init() { registerDBCreator(RDBBackend, rdbCreator, true) } diff --git a/metadb/db_rocksdb.go b/metadb/db_rocksdb.go index 7f1dc1f..7612327 100644 --- a/metadb/db_rocksdb.go +++ b/metadb/db_rocksdb.go @@ -1,14 +1,17 @@ +//go:build rocksdb // +build rocksdb package metadb import ( tmdb "github.com/line/tm-db/v2" - "github.com/line/tm-db/v2/rocksdb" + "github.com/line/tm-db/v2/rdb" + _ "github.com/line/tm-db/v2/rocksdb" ) func rocksDBCreator(name, dir string) (tmdb.DB, error) { - return rocksdb.NewDB(name, dir) + // TODO: use rdb instead of rocksdb for now as gorocksdb doesn't have low priority write option + return rdb.NewDB(name, dir) } func init() { registerDBCreator(RocksDBBackend, rocksDBCreator, true) } diff --git a/prefixdb/batch.go b/prefixdb/batch.go index fb701ae..faae113 100644 --- a/prefixdb/batch.go +++ b/prefixdb/batch.go @@ -50,6 +50,11 @@ func (pb prefixDBBatch) WriteSync() error { return pb.source.WriteSync() } +// WriteLowPri implements Batch. +func (pb prefixDBBatch) WriteLowPri() error { + return pb.source.WriteLowPri() +} + // Close implements Batch. func (pb prefixDBBatch) Close() error { return pb.source.Close() diff --git a/prefixdb/db.go b/prefixdb/db.go index 73f314b..a990cbf 100644 --- a/prefixdb/db.go +++ b/prefixdb/db.go @@ -27,6 +27,10 @@ func NewDB(db tmdb.DB, prefix []byte) *PrefixDB { } } +func (pdb *PrefixDB) Name() string { + return fmt.Sprintf("%s->%s", pdb.db.Name(), pdb.prefix) +} + // Get implements DB. func (pdb *PrefixDB) Get(key []byte) ([]byte, error) { if len(key) == 0 { diff --git a/rdb/batch.go b/rdb/batch.go new file mode 100644 index 0000000..a9640d4 --- /dev/null +++ b/rdb/batch.go @@ -0,0 +1,97 @@ +//go:build rocksdb +// +build rocksdb + +package rdb + +// #include +// #include "rocksdb/c.h" +import "C" + +import ( + tmdb "github.com/line/tm-db/v2" +) + +type rdbBatch struct { + db *RDB + b *C.rocksdb_writebatch_t +} + +func newRDBBatch(db *RDB) *rdbBatch { + return &rdbBatch{ + db: db, + b: C.rocksdb_writebatch_create(), + } +} + +func (b *rdbBatch) Set(key, value []byte) error { + if len(key) == 0 { + return tmdb.ErrKeyEmpty + } + if value == nil { + return tmdb.ErrValueNil + } + if b.b == nil { + return tmdb.ErrBatchClosed + } + ck, cv := b2c(key), b2c(value) + C.rocksdb_writebatch_put(b.b, ck, C.size_t(len(key)), cv, C.size_t(len(value))) + return nil +} + +func (b *rdbBatch) Delete(key []byte) error { + if len(key) == 0 { + return tmdb.ErrKeyEmpty + } + if b.b == nil { + return tmdb.ErrBatchClosed + } + C.rocksdb_writebatch_delete(b.b, b2c(key), C.size_t(len(key))) + return nil +} + +func (b *rdbBatch) Write() error { + if b.b == nil { + return tmdb.ErrBatchClosed + } + var cerr *C.char + C.rocksdb_write(b.db.db, b.db.wopts, b.b, &cerr) + b.Close() + if cerr != nil { + return cerror(cerr) + } + return nil +} + +func (b *rdbBatch) WriteSync() error { + if b.b == nil { + return tmdb.ErrBatchClosed + } + var cerr *C.char + C.rocksdb_write(b.db.db, b.db.wsopts, b.b, &cerr) + b.Close() + if cerr != nil { + return cerror(cerr) + } + return nil +} + +func (b *rdbBatch) WriteLowPri() error { + if b.b == nil { + return tmdb.ErrBatchClosed + } + var cerr *C.char + C.rocksdb_write(b.db.db, b.db.wlpopts, b.b, &cerr) + b.Close() + if cerr != nil { + return cerror(cerr) + } + return nil +} + +func (b *rdbBatch) Close() error { + if b.b != nil { + C.rocksdb_writebatch_destroy(b.b) + b.b = nil + } + return nil +} diff --git a/rdb/db.go b/rdb/db.go new file mode 100644 index 0000000..68ca4f3 --- /dev/null +++ b/rdb/db.go @@ -0,0 +1,243 @@ +//go:build rocksdb +// +build rocksdb + +package rdb + +// #include +// #include "rocksdb/c.h" +import "C" + +import ( + "errors" + "fmt" + "path/filepath" + "runtime" + "unsafe" + + tmdb "github.com/line/tm-db/v2" + "github.com/line/tm-db/v2/internal/util" +) + +type RDB struct { + name string + fn string + db *C.rocksdb_t + opts *C.rocksdb_options_t + ropts *C.rocksdb_readoptions_t // read options + wopts *C.rocksdb_writeoptions_t // write options + wsopts *C.rocksdb_writeoptions_t // sync write options + wlpopts *C.rocksdb_writeoptions_t // low priority write options +} + +type rdbIterator struct { + db *RDB + it *C.rocksdb_iterator_t + ropts *C.rocksdb_readoptions_t + reverse bool + lowerBound []byte + upperBound []byte +} + +func cerror(cerr *C.char) error { + if cerr == nil { + return nil + } + err := errors.New(C.GoString(cerr)) + C.free(unsafe.Pointer(cerr)) + return err +} + +func b2c(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} + +func NewDB(name string, dir string) (*RDB, error) { + var cerr *C.char + + fn := filepath.Join(dir, name+".db") + + bbto := C.rocksdb_block_based_options_create() + C.rocksdb_block_based_options_set_block_cache(bbto, C.rocksdb_cache_create_lru(C.size_t(1<<30))) + C.rocksdb_block_based_options_set_filter_policy(bbto, C.rocksdb_filterpolicy_create_bloom(C.int(10))) + defer C.rocksdb_block_based_options_destroy(bbto) + + opts := C.rocksdb_options_create() + C.rocksdb_options_set_block_based_table_factory(opts, bbto) + C.rocksdb_options_set_create_if_missing(opts, 1) + C.rocksdb_options_increase_parallelism(opts, C.int(runtime.NumCPU())) + C.rocksdb_options_optimize_level_style_compaction(opts, 512*1024*1024) + C.rocksdb_options_set_enable_pipelined_write(opts, 1) + + ropts := C.rocksdb_readoptions_create() + wopts := C.rocksdb_writeoptions_create() + wsopts := C.rocksdb_writeoptions_create() + C.rocksdb_writeoptions_set_sync(wsopts, C.uchar(1)) + wlpopts := C.rocksdb_writeoptions_create() + C.rocksdb_writeoptions_set_low_pri(wlpopts, C.uchar(1)) + + db := C.rocksdb_open(opts, b2c([]byte(fn)), &cerr) + if cerr != nil { + C.rocksdb_options_destroy(opts) + C.rocksdb_writeoptions_destroy(wopts) + C.rocksdb_writeoptions_destroy(wsopts) + C.rocksdb_writeoptions_destroy(wlpopts) + return nil, cerror(cerr) + } + return &RDB{ + name: name, + fn: fn, + db: db, + opts: opts, + ropts: ropts, + wopts: wopts, + wsopts: wsopts, + wlpopts: wlpopts, + }, nil +} + +func (db *RDB) Name() string { + return db.name +} + +func (db *RDB) Get(key []byte) ([]byte, error) { + var cerr *C.char + var cvl C.size_t + ck := b2c(key) + cv := C.rocksdb_get(db.db, db.ropts, ck, C.size_t(len(key)), &cvl, &cerr) + if cerr != nil { + return nil, cerror(cerr) + } + if cv == nil { + return nil, nil + } + rv := C.GoBytes(unsafe.Pointer(cv), C.int(cvl)) + C.free(unsafe.Pointer(cv)) + return rv, nil +} + +func (db *RDB) Has(key []byte) (bool, error) { + var cerr *C.char + var cvl C.size_t + ck := b2c(key) + cv := C.rocksdb_get(db.db, db.ropts, ck, C.size_t(len(key)), &cvl, &cerr) + if cerr != nil { + return false, cerror(cerr) + } + if cv == nil { + return false, nil + } + C.free(unsafe.Pointer(cv)) + return true, nil +} + +func (db *RDB) Set(key []byte, value []byte) error { + var cerr *C.char + ck, cv := b2c(key), b2c(value) + C.rocksdb_put(db.db, db.wopts, ck, C.size_t(len(key)), cv, C.size_t(len(value)), + &cerr) + if cerr != nil { + return cerror(cerr) + } + return nil +} + +func (db *RDB) SetSync(key []byte, value []byte) error { + var cerr *C.char + ck, cv := b2c(key), b2c(value) + C.rocksdb_put(db.db, db.wsopts, ck, C.size_t(len(key)), cv, C.size_t(len(value)), + &cerr) + if cerr != nil { + return cerror(cerr) + } + return nil +} + +func (db *RDB) Delete(key []byte) error { + var cerr *C.char + ck := b2c(key) + C.rocksdb_delete(db.db, db.wopts, ck, C.size_t(len(key)), &cerr) + if cerr != nil { + return cerror(cerr) + } + return nil +} + +func (db *RDB) DeleteSync(key []byte) error { + var cerr *C.char + ck := b2c(key) + C.rocksdb_delete(db.db, db.wsopts, ck, C.size_t(len(key)), &cerr) + if cerr != nil { + return cerror(cerr) + } + return nil +} + +func (db *RDB) Close() error { + C.rocksdb_options_destroy(db.opts) + C.rocksdb_readoptions_destroy(db.ropts) + C.rocksdb_writeoptions_destroy(db.wopts) + C.rocksdb_writeoptions_destroy(db.wsopts) + C.rocksdb_writeoptions_destroy(db.wlpopts) + C.rocksdb_close(db.db) + return nil +} + +// TODO: not implemented yet +func (db *RDB) Stats() map[string]string { + m := map[string]string{} + m["dummy"] = "100" + return m +} + +func (db *RDB) NewBatch() tmdb.Batch { + return newRDBBatch(db) +} + +func (db *RDB) Iterator(start, end []byte) (tmdb.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, tmdb.ErrKeyEmpty + } + return newRdbIterator(db, start, end, false), nil +} + +func (db *RDB) PrefixIterator(prefix []byte) (tmdb.Iterator, error) { + start, end, err := util.PrefixToRange(prefix) + if err != nil { + return nil, err + } + return newRdbIterator(db, start, end, false), nil +} + +func (db *RDB) ReverseIterator(start, end []byte) (tmdb.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, tmdb.ErrKeyEmpty + } + return newRdbIterator(db, start, end, true), nil +} + +func (db *RDB) ReversePrefixIterator(prefix []byte) (tmdb.Iterator, error) { + start, end, err := util.PrefixToRange(prefix) + if err != nil { + return nil, err + } + return newRdbIterator(db, start, end, true), nil +} + +func (db *RDB) Print() error { + itr, err := db.Iterator(nil, nil) + if err != nil { + return err + } + defer itr.Close() + for ; itr.Valid(); itr.Next() { + key := itr.Key() + value := itr.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } + return nil +} + +// EOF diff --git a/rdb/db_test.go b/rdb/db_test.go new file mode 100644 index 0000000..a086daa --- /dev/null +++ b/rdb/db_test.go @@ -0,0 +1,145 @@ +//go:build rocksdb +// +build rocksdb + +package rdb + +import ( + "testing" + + "github.com/line/tm-db/v2/internal/dbtest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRDBNewDB(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) +} + +func TestRDBStats(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) + + assert.NotEmpty(t, db.Stats()) +} + +func TestRDBIterator(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) + + dbtest.TestDBIterator(t, db) +} + +func TestRDBIteratorNoWrites(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) + + dbtest.TestDBIteratorNoWrites(t, db) +} + +func TestRDBEmptyIterator(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) + + dbtest.TestDBEmptyIterator(t, db) +} + +func TestRDBPrefixIterator(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) + + dbtest.TestDBPrefixIterator(t, db) +} + +func TestRDBPrefixIteratorNoMatchNil(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) + + dbtest.TestPrefixIteratorNoMatchNil(t, db) +} + +func TestRDBPrefixIteratorNoMatch1(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) + + dbtest.TestPrefixIteratorNoMatch1(t, db) +} + +func TestRDBPrefixIteratorNoMatch2(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) + + dbtest.TestPrefixIteratorNoMatch2(t, db) +} + +func TestRDBPrefixIteratorMatch1(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) + + dbtest.TestPrefixIteratorMatch1(t, db) +} + +func TestRDBPrefixIteratorMatches1N(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) + + dbtest.TestPrefixIteratorMatches1N(t, db) +} + +func TestRDBBatch(t *testing.T) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(t, err) + + dbtest.TestDBBatch(t, db) +} + +func BenchmarkRDBRangeScans1M(b *testing.B) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(b, err) + + dbtest.BenchmarkRangeScans(b, db, int64(1e6)) +} + +func BenchmarkRDBRangeScans10M(b *testing.B) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(b, err) + + dbtest.BenchmarkRangeScans(b, db, int64(10e6)) +} + +func BenchmarkRDBRandomReadsWrites(b *testing.B) { + name, dir := dbtest.NewTestName("rdb") + db, err := NewDB(name, dir) + defer dbtest.CleanupDB(db, name, dir) + require.NoError(b, err) + + dbtest.BenchmarkRandomReadsWrites(b, db) +} diff --git a/rdb/iterator.go b/rdb/iterator.go new file mode 100644 index 0000000..c68ee30 --- /dev/null +++ b/rdb/iterator.go @@ -0,0 +1,98 @@ +//go:build rocksdb +// +build rocksdb + +package rdb + +// #include +// #include "rocksdb/c.h" +import "C" + +import ( + "unsafe" +) + +func newRdbIterator(db *RDB, lowerBound, upperBound []byte, reverse bool) *rdbIterator { + ropts := C.rocksdb_readoptions_create() + if len(lowerBound) >= 0 { + C.rocksdb_readoptions_set_iterate_lower_bound(ropts, b2c(lowerBound), C.size_t(len(lowerBound))) + } + if len(upperBound) >= 0 { + C.rocksdb_readoptions_set_iterate_upper_bound(ropts, b2c(upperBound), C.size_t(len(upperBound))) + } + it := C.rocksdb_create_iterator(db.db, ropts) + if !reverse { + C.rocksdb_iter_seek_to_first(it) + } else { + C.rocksdb_iter_seek_to_last(it) + } + return &rdbIterator{ + db: db, + it: it, + ropts: ropts, + reverse: reverse, + lowerBound: lowerBound, + upperBound: upperBound, + } +} + +func (itr *rdbIterator) Valid() bool { + return C.rocksdb_iter_valid(itr.it) != 0 +} + +func (itr *rdbIterator) Key() []byte { + if C.rocksdb_iter_valid(itr.it) == 0 { + panic("iterator is invalid") + } + + var cvl C.size_t + cv := C.rocksdb_iter_key(itr.it, &cvl) + if cv == nil { + return nil + } + return C.GoBytes(unsafe.Pointer(cv), C.int(cvl)) +} + +func (itr *rdbIterator) Value() []byte { + if C.rocksdb_iter_valid(itr.it) == 0 { + panic("iterator is invalid") + } + var cvl C.size_t + cv := C.rocksdb_iter_value(itr.it, &cvl) + if cv == nil { + return nil + } + return C.GoBytes(unsafe.Pointer(cv), C.int(cvl)) +} + +func (itr *rdbIterator) Next() { + if C.rocksdb_iter_valid(itr.it) == 0 { + panic("iterator is invalid") + } + if !itr.reverse { + C.rocksdb_iter_next(itr.it) + } else { + C.rocksdb_iter_prev(itr.it) + } +} + +func (itr *rdbIterator) Error() error { + var cerr *C.char + if itr.it == nil { + return nil + } + C.rocksdb_iter_get_error(itr.it, &cerr) + if cerr != nil { + return cerror(cerr) + } + return nil +} + +func (itr *rdbIterator) Close() error { + if itr.it != nil { + C.rocksdb_iter_destroy(itr.it) + } + itr.it, itr.db, itr.lowerBound, itr.upperBound = nil, nil, nil, nil + return nil +} + +// EOF diff --git a/remotedb/batch.go b/remotedb/batch.go index 7a4176e..a43a07a 100644 --- a/remotedb/batch.go +++ b/remotedb/batch.go @@ -74,6 +74,11 @@ func (b *batch) WriteSync() error { return b.Close() } +// WriteLowPri implements Batch. +func (b *batch) WriteLowPri() error { + return b.Write() +} + // Close implements Batch. func (b *batch) Close() error { b.ops = nil diff --git a/remotedb/db.go b/remotedb/db.go index ae71f8d..20b5a01 100644 --- a/remotedb/db.go +++ b/remotedb/db.go @@ -12,19 +12,21 @@ import ( ) type RemoteDB struct { - ctx context.Context - dc protodb.DBClient + name string + ctx context.Context + dc protodb.DBClient } func NewDB(serverAddr string, serverKey string) (*RemoteDB, error) { - return newDB(grpcdb.NewClient(serverAddr, serverKey)) + gdc, err := grpcdb.NewClient(serverAddr, serverKey) + return newDB(fmt.Sprintf("%s://%s", serverAddr, serverKey), gdc, err) } -func newDB(gdc protodb.DBClient, err error) (*RemoteDB, error) { +func newDB(name string, gdc protodb.DBClient, err error) (*RemoteDB, error) { if err != nil { return nil, err } - return &RemoteDB{dc: gdc, ctx: context.Background()}, nil + return &RemoteDB{name: name, dc: gdc, ctx: context.Background()}, nil } type Init struct { @@ -33,6 +35,10 @@ type Init struct { Type string } +func (rd *RemoteDB) Name() string { + return rd.name +} + func (rd *RemoteDB) InitRemote(in *Init) error { _, err := rd.dc.Init(rd.ctx, &protodb.Init{Dir: in.Dir, Type: in.Type, Name: in.Name}) return err diff --git a/rocksdb/batch.go b/rocksdb/batch.go index 0c33caa..dcd6fbb 100644 --- a/rocksdb/batch.go +++ b/rocksdb/batch.go @@ -73,6 +73,20 @@ func (b *rocksDBBatch) WriteSync() error { return b.Close() } +// WriteLowPri implements Batch. +func (b *rocksDBBatch) WriteLowPri() error { + if b.batch == nil { + return tmdb.ErrBatchClosed + } + err := b.db.db.Write(b.db.woLowPri, b.batch) + if err != nil { + return err + } + // Make sure batch cannot be used afterwards. Callers should still call Close(), for errors. + b.Close() + return nil +} + // Close implements Batch. func (b *rocksDBBatch) Close() error { if b.batch != nil { diff --git a/rocksdb/db.go b/rocksdb/db.go index d0afedf..7b800b3 100644 --- a/rocksdb/db.go +++ b/rocksdb/db.go @@ -12,10 +12,12 @@ import ( // RocksDB is a RocksDB backend. type RocksDB struct { - db *gorocksdb.DB - ro *gorocksdb.ReadOptions - wo *gorocksdb.WriteOptions - woSync *gorocksdb.WriteOptions + name string + db *gorocksdb.DB + ro *gorocksdb.ReadOptions + wo *gorocksdb.WriteOptions + woSync *gorocksdb.WriteOptions + woLowPri *gorocksdb.WriteOptions } var _ tmdb.DB = (*RocksDB)(nil) @@ -55,15 +57,24 @@ func NewDBWithOptions(name string, dir string, opts *gorocksdb.Options) (*RocksD wo := gorocksdb.NewDefaultWriteOptions() woSync := gorocksdb.NewDefaultWriteOptions() woSync.SetSync(true) + woLowPri := gorocksdb.NewDefaultWriteOptions() + // TODO: gorocksdb doesn't have rocksdb_writeoptions_set_low_pri() yet. + // woLowPri.SetLowPri(true) database := &RocksDB{ - db: db, - ro: ro, - wo: wo, - woSync: woSync, + name: name, + db: db, + ro: ro, + wo: wo, + woSync: woSync, + woLowPri: woLowPri, } return database, nil } +func (db *RocksDB) Name() string { + return db.name +} + // Get implements DB. func (db *RocksDB) Get(key []byte) ([]byte, error) { if len(key) == 0 { diff --git a/rocksdb/iterator.go b/rocksdb/iterator.go index cd555fb..9742b31 100644 --- a/rocksdb/iterator.go +++ b/rocksdb/iterator.go @@ -3,6 +3,7 @@ package rocksdb import ( "github.com/line/gorocksdb" tmdb "github.com/line/tm-db/v2" + "github.com/line/tm-db/v2/internal/util" ) type rocksDBIterator struct { @@ -10,8 +11,8 @@ type rocksDBIterator struct { opts *gorocksdb.ReadOptions isReverse bool isInvalid bool - key *gorocksdb.Slice - value *gorocksdb.Slice + key []byte + value []byte } var _ tmdb.Iterator = (*rocksDBIterator)(nil) @@ -69,25 +70,26 @@ func (itr *rocksDBIterator) invalidate() { func (itr *rocksDBIterator) Key() []byte { itr.assertIsValid() if itr.key == nil { - itr.key = itr.source.Key() + itr.key = moveSliceToBytes(itr.source.Key()) } - return itr.key.Data() + return itr.key } // Value implements Iterator. func (itr *rocksDBIterator) Value() []byte { itr.assertIsValid() if itr.value == nil { - itr.value = itr.source.Value() + itr.value = moveSliceToBytes(itr.source.Value()) } - return itr.value.Data() + return itr.value } // Next implements Iterator. func (itr *rocksDBIterator) Next() { itr.assertIsValid() - itr.freeKeyValue() + itr.key = nil + itr.value = nil if !itr.isReverse { itr.source.Next() @@ -111,23 +113,23 @@ func (itr *rocksDBIterator) Close() error { itr.opts.Destroy() itr.opts = nil } - itr.freeKeyValue() return nil } -func (itr *rocksDBIterator) freeKeyValue() { - if itr.key != nil { - itr.key.Free() - itr.key = nil - } - if itr.value != nil { - itr.value.Free() - itr.value = nil - } -} - func (itr *rocksDBIterator) assertIsValid() { if itr.isInvalid { panic("iterator is invalid") } } + +// moveSliceToBytes will free the slice and copy out a go []byte +// This function can be applied on *Slice returned from Key() and Value() +// of an Iterator, because they are marked as freed. +func moveSliceToBytes(s *gorocksdb.Slice) []byte { + var bz []byte + if s.Exists() { + bz = util.Cp(s.Data()) + } + s.Free() + return bz +} diff --git a/types.go b/types.go index 1a18f96..53b3ccf 100644 --- a/types.go +++ b/types.go @@ -19,6 +19,8 @@ var ( // Keys cannot be nil or empty, while values cannot be nil. Keys and values should be considered // read-only, both when returned and when given, and must be copied before they are modified. type DB interface { + Name() string + // Get fetches the value of the given key, or nil if it does not exist. // CONTRACT: key, value readonly []byte Get([]byte) ([]byte, error) @@ -99,6 +101,9 @@ type Batch interface { // methods will error. WriteSync() error + // WriteLowPri write the batch with lower priority + WriteLowPri() error + // Close closes the batch. It is idempotent, but calls to other methods afterwards will error. Close() error }