-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Discard invalid versions of keys during compaction #464
Comments
This is what it looks like running as FiloIO for TableLoadingMode. |
more craziness... test case: start up a writer, write TTL keys for 5 minutes, shut off and watch for compaction to clean everything up and purge. expected result: actual result: |
Looks like as SSTs are growing, bloom filters are taking up more and more space, because they're always loaded into memory. We can probably use an LRU cache for blooms to avoid their memory usage growing forever. I'll run your code over the weekend, and check if I get the OOM, and what to do about it. Ping if you don't hear by Tuesday. |
here is a complete working example with a main() func package main
import (
"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/options"
"time"
"fmt"
"encoding/binary"
"path/filepath"
"os"
"github.com/Sirupsen/logrus"
"encoding/json"
_ "net/http/pprof"
"net/http"
)
type writable struct {
key []byte
value []byte
}
var (
// store the first key we've seen so we can try and query it to see when it's expired from view
firstKey []byte
firstKeyStored bool //
stopWriting = false
)
func main() {
go func() {
// you can hit:
// go tool pprof http://localhost:8001/debug/pprof/heap
// go tool pprof http://localhost:8001/debug/pprof/profile
logrus.Infof("starting debug web server....")
logrus.Info(http.ListenAndServe("localhost:8001", nil))
}()
done := make(chan struct{})
bt := NewBadgerTest()
bt.Start()
<-done // wait
}
type BadgerTest struct {
db *badger.DB
}
func NewBadgerTest() *BadgerTest {
dir := "/raid0/badgertest"
opts := badger.DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
opts.ValueLogLoadingMode = options.FileIO
opts.TableLoadingMode = options.FileIO
opts.NumCompactors = 20
opts.MaxTableSize = .25 * 1073741824 // .25GB
opts.NumMemtables = 2
opts.ValueLogFileSize = 1 * 1073741824 // 2GB
opts.SyncWrites = false
bytes, _ := json.Marshal(&opts)
logrus.Infof("BADGER OPTIONS=%s", string(bytes))
db, err := badger.Open(opts)
if err != nil {
panic(fmt.Sprintf("unable to open badger db; %s", err))
}
bt := &BadgerTest{
db: db,
}
go bt.filecounts(dir)
return bt
}
func (b *BadgerTest) Start() {
workers := 1
for i := 0; i < workers; i++ {
go b.write()
}
//go b.write()
go b.badgerGC()
go func() {
tick := time.NewTicker(1 * time.Minute)
for _ = range tick.C {
b.readKey()
}
}()
//go func() {
// tick := time.NewTicker(15 * time.Minute)
// for _ = range tick.C {
// stopWriting = true
// return
// }
//}()
}
func (b *BadgerTest) Stop() {
b.db.Close()
logrus.Infof("%s shut down badger test", time.Now())
time.Sleep(1 * time.Second)
}
func (b *BadgerTest) filecounts(dir string) {
ticker := time.NewTicker(60 * time.Second)
for _ = range ticker.C {
logFiles := int64(0)
sstFiles := int64(0)
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if filepath.Ext(path) == ".sst" {
sstFiles++
}
if filepath.Ext(path) == ".vlog" {
logFiles++
}
return nil
})
logrus.Infof("%s updated gauges vlog=%d sst=%d", time.Now(), logFiles, sstFiles)
}
}
func (b *BadgerTest) write() {
data := `{"randomstring":"6446D58D6DFAFD58586D3EA85A53F4A6B3CC057F933A22BB58E188A74AC8F663","refID":12495494,"testfield1234":"foo bar baz","date":"2018-01-01 12:00:00"}`
batchSize := 1000
rows := []writable{}
var cnt uint64
for {
if stopWriting {
logrus.Infof("%s stopping writing, let it catch up", time.Now())
return
}
cnt++
ts := time.Now().UnixNano()
buf := make([]byte, 16)
offset := 0
binary.BigEndian.PutUint64(buf[offset:], uint64(ts))
offset = offset + 8
binary.BigEndian.PutUint64(buf[offset:], cnt)
if !firstKeyStored {
firstKey = buf
firstKeyStored = true
logrus.Infof("%s firstkey stored %x", time.Now(), firstKey)
}
w := writable{key: buf, value: []byte(data)}
rows = append(rows, w)
if len(rows) > batchSize {
b.saveRows(rows)
rows = []writable{}
}
}
}
func (b *BadgerTest) saveRows(rows []writable) {
ttl := 30 * time.Minute
txn := b.db.NewTransaction(true)
for _, row := range rows {
if err := txn.SetWithTTL(row.key, row.value, ttl); err == badger.ErrTxnTooBig {
logrus.Infof("%s TX too big, committing...", time.Now())
_ = txn.Commit(nil)
txn = b.db.NewTransaction(true)
err = txn.SetWithTTL(row.key, row.value, ttl)
} else if err != nil && err != badger.ErrTxnTooBig {
logrus.Errorf("unable to set key-val in transaction; %s", err)
}
}
err := txn.Commit(nil)
if err != nil {
logrus.Errorf("unable to commit transaction; %s", err)
}
}
func (b *BadgerTest) readKey() {
// at some point our first key should be expired
err := b.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(firstKey))
if err != nil {
return err
}
val, err := item.Value()
if err != nil {
return err
}
logrus.Infof("%s FIRSTKEY VALUE=%s", time.Now(), val)
return nil
})
if err != nil {
logrus.Errorf("%s FIRSTKEY unable to read first key from badger; %s", time.Now(), err)
}
}
func (b *BadgerTest) badgerGC() {
_, lastVlogSize := b.db.Size()
logrus.Infof("lastVLOGSize=%v", lastVlogSize)
ticker := time.NewTicker(2 * time.Minute)
const GB = int64(1 << 30)
for {
select {
case <-ticker.C:
_, currentVlogSize := b.db.Size()
if currentVlogSize < lastVlogSize+GB {
continue
}
logrus.Infof("%s CLEANUP starting to purge keys", time.Now())
err := b.db.PurgeOlderVersions()
if err != nil {
logrus.Errorf("%s badgerOps unable to purge older versions; %s", time.Now(), err)
}
logrus.Infof("%s CLEANUP PurgeOlderVersions completed", time.Now())
// If size increased by 3.5 GB, then we run this 3 times.
numTimes := (currentVlogSize - lastVlogSize) / GB
for i := 0; i < int(numTimes); i++ {
err := b.db.RunValueLogGC(0.5)
if err != nil {
logrus.Errorf("%s badgerOps unable to RunValueLogGC; %s", time.Now(), err)
}
logrus.Infof("%s CLEANUP RunValueLogGC completed iteration=%d", time.Now(), i)
}
_, lastVlogSize = b.db.Size()
}
}
} |
I added a stop write boolean to see if eventually data would clean up. From that testing it appears vlog files will eventually be cleaned up from expired keys but sst tables stay forever. Also appears under heavy write load vlog cleanup will never catch up. Not sure if theres room for more concurrency there. They key for me is finding a way to have consistent, heavy write loads 24/7 that can maintain a stable memory profile. |
Other big observation... MAXTABLESIZE in the options acts funny. If I set that to 2GB what I'd expect to happen is that after a memtable flush a smallish level 0 file would be written maybe 50MB, then after 10 of those files they get compacted into larger, maybe 200MB files, and level 2, 500MB etc... e.g. each level has larger and larger SST files. What I actually observe is that no matter what I set that value to, it writes about 70% of the maxtablesize at all levels. If I set that to 1GB it writes 700MB sst files at level 0. |
Keys aren't really deleted from the LSM tree, they're only tombstoned (marked as deleted). I'll see if we can have a way by which the keys can be removed too, but that'd be beyond the scope of this issue. |
Hrm. Sounds like a pretty big problem? Deletes should write tombstones but compaction should eventually physically remove those items from disk otherwise you can’t manage a growing dataset outside of sharding badger dbs. Should I file a separate issue? |
Deleting a key in LSM tree isn't a common thing, exactly because of compactions. Say, you're compacting a key from L1->L2, a key which was deleted, so you just remove it altogether. An older version of that key might be present in L4, so all reads would now default to that, bringing that deleted key back. Therefore, newer versions of keys are tombstoned, instead of removed in LSM trees. However, if there are alternative approach used by other LSM trees (say RocksDB), happy to take a look. |
Quick reference You have the bloom filters that let you know if a key is present anywhere else. I’ll dig through RocksDB tomorrow but I haven’t seen an lsm that doesn’t remove keys at some point. Otherwise you just fill disk. |
https://github.com/google/leveldb/blob/master/doc/impl.md |
http://cidrdb.org/cidr2017/papers/p82-dong-cidr17.pdf RocksDB overview |
This is what we're doing as well.
I tried implementing this, however, the multi-version nature of Badger keys makes this hard (probably also why RocksDB does not remove keys as well). I worked on a logic to remove deleted or expired keys with zero overlap from lower levels. https://github.com/dgraph-io/badger/tree/mrjn/removekeys However, this still causes edge cases where a stale version of the key could resurrect after the removal of a tombstone. Consider level L, the lowest level in the LSM tree (no overlap from below levels). Consider a key K, with many versions, spanning 2 or more sstables, T1 and T2. Say, T1 contains the latest versions of K, including the tombstone marker. This marker would cause all the older versions as discardable. Now, say T1 gets compacted to L+1 first. We remove the tomstoned K, and all the versions present in T1. T1 moves to L+1, with no version of K. However, when a Get(K) is run, in the absence of any tombstone marker, it would read the latest version present of K, present in T2, and serve that. This would be wrong. Compaction runs only one table at a time. We can't go across tables at the same level, to ensure that all versions of K are picked up together. Instead, what you could do is to take a backup of Badger, and then reimport it. Or, we could have a tool which would only iterate over the LSM tree and rebuild it, without touching the value log -- this would be an offline operation, would reclaim LSM space and would be very fast. Update: Another approach might be to ensure that a single key never spans multiple tables at the same level. That might fix this problem. I'll consider that further. |
I've implemented the above-mentioned update and added the ability to discard all older versions starting from the first deleted or expired version of the key. Can you try with the https://github.com/dgraph-io/badger/tree/mrjn/removekeys branch? Update: Ran it with your code (plus modifications to limit number of keys to 10), and the LSM tree is working pretty nicely. After 2.75 hours and 100GB of data, this is the info log: |
awesome, I'll give it a try today. I'd say your comment about no key spans multiple files at the same level is a good move and what other LSM implementations do as well (re: http://www.benstopford.com/2015/02/14/log-structured-merge-trees/ section: "Levelled Compaction") I also came across a good post from influx that describes the situation as well and what they've tried TLDR: for DB's like badger, shard the actual DB by time, for RocksDB use column families with time blocks and delete the column family after a period of time has passed this efficiently removes the data. https://docs.influxdata.com/influxdb/v0.9/concepts/storage_engine/ |
Tested with that branch. It looks like nothing got cleaned up in my case. Memory did drop though. Test: Result:
|
complete test code: package main
import (
"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/options"
"time"
"fmt"
"encoding/binary"
"path/filepath"
"os"
"github.com/Sirupsen/logrus"
"encoding/json"
_ "net/http/pprof"
"net/http"
)
type writable struct {
key []byte
value []byte
}
var (
// store the first key we've seen so we can try and query it to see when it's expired from view
firstKey []byte
firstKeyStored bool //
stopWriting = false
)
func main() {
go func() {
// you can hit:
// go tool pprof http://localhost:8001/debug/pprof/heap
// go tool pprof http://localhost:8001/debug/pprof/profile
logrus.Infof("starting debug web server....")
logrus.Info(http.ListenAndServe("localhost:8001", nil))
}()
done := make(chan struct{})
bt := NewBadgerTest()
bt.Start()
<-done // wait
}
type BadgerTest struct {
db *badger.DB
}
func NewBadgerTest() *BadgerTest {
dir := "/raid0/badgertest"
opts := badger.DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
opts.ValueLogLoadingMode = options.FileIO
opts.TableLoadingMode = options.FileIO
opts.NumCompactors = 20
opts.MaxTableSize = .25 * 1073741824 // .25GB
opts.NumMemtables = 2
opts.ValueLogFileSize = 1 * 1073741824 // 2GB
opts.SyncWrites = false
bytes, _ := json.Marshal(&opts)
logrus.Infof("BADGER OPTIONS=%s", string(bytes))
db, err := badger.Open(opts)
if err != nil {
panic(fmt.Sprintf("unable to open badger db; %s", err))
}
bt := &BadgerTest{
db: db,
}
go bt.filecounts(dir)
return bt
}
func (b *BadgerTest) Start() {
workers := 1
for i := 0; i < workers; i++ {
go b.write()
}
//go b.write()
go b.badgerGC()
go func() {
tick := time.NewTicker(1 * time.Minute)
for _ = range tick.C {
b.readKey()
}
}()
go func() {
tick := time.NewTicker(15 * time.Minute)
for _ = range tick.C {
stopWriting = true
return
}
}()
}
func (b *BadgerTest) Stop() {
b.db.Close()
logrus.Infof("%s shut down badger test", time.Now())
time.Sleep(1 * time.Second)
}
func (b *BadgerTest) filecounts(dir string) {
ticker := time.NewTicker(60 * time.Second)
for _ = range ticker.C {
logFiles := int64(0)
sstFiles := int64(0)
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if filepath.Ext(path) == ".sst" {
sstFiles++
}
if filepath.Ext(path) == ".vlog" {
logFiles++
}
return nil
})
logrus.Infof("%s updated gauges vlog=%d sst=%d", time.Now(), logFiles, sstFiles)
}
}
func (b *BadgerTest) write() {
data := `{"randomstring":"6446D58D6DFAFD58586D3EA85A53F4A6B3CC057F933A22BB58E188A74AC8F663","refID":12495494,"testfield1234":"foo bar baz","date":"2018-01-01 12:00:00"}`
batchSize := 1000
rows := []writable{}
var cnt uint64
for {
if stopWriting {
logrus.Infof("%s stopping writing, let's see if compaction removes TTL'd data", time.Now())
return
}
cnt++
ts := time.Now().UnixNano()
buf := make([]byte, 16)
offset := 0
binary.BigEndian.PutUint64(buf[offset:], uint64(ts))
offset = offset + 8
binary.BigEndian.PutUint64(buf[offset:], cnt)
if !firstKeyStored {
firstKey = buf
firstKeyStored = true
logrus.Infof("%s firstkey stored %x", time.Now(), firstKey)
}
w := writable{key: buf, value: []byte(data)}
rows = append(rows, w)
if len(rows) > batchSize {
b.saveRows(rows)
rows = []writable{}
}
}
}
func (b *BadgerTest) saveRows(rows []writable) {
ttl := 20 * time.Minute
txn := b.db.NewTransaction(true)
for _, row := range rows {
if err := txn.SetWithTTL(row.key, row.value, ttl); err == badger.ErrTxnTooBig {
logrus.Infof("%s TX too big, committing...", time.Now())
_ = txn.Commit(nil)
txn = b.db.NewTransaction(true)
err = txn.SetWithTTL(row.key, row.value, ttl)
} else if err != nil && err != badger.ErrTxnTooBig {
logrus.Errorf("unable to set key-val in transaction; %s", err)
}
}
err := txn.Commit(nil)
if err != nil {
logrus.Errorf("unable to commit transaction; %s", err)
}
}
func (b *BadgerTest) readKey() {
// at some point our first key should be expired
err := b.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(firstKey))
if err != nil {
return err
}
val, err := item.Value()
if err != nil {
return err
}
logrus.Infof("%s FIRSTKEY VALUE=%s", time.Now(), val)
return nil
})
if err != nil {
logrus.Errorf("%s FIRSTKEY unable to read first key from badger; %s", time.Now(), err)
}
}
func (b *BadgerTest) badgerGC() {
_, lastVlogSize := b.db.Size()
logrus.Infof("lastVLOGSize=%v", lastVlogSize)
ticker := time.NewTicker(2 * time.Minute)
const GB = int64(1 << 30)
for {
select {
case <-ticker.C:
_, currentVlogSize := b.db.Size()
//if currentVlogSize < lastVlogSize+GB {
// continue
//}
logrus.Infof("%s CLEANUP starting to purge keys size=%v", time.Now(), currentVlogSize)
err := b.db.PurgeOlderVersions()
if err != nil {
logrus.Errorf("%s badgerOps unable to purge older versions; %s", time.Now(), err)
}
logrus.Infof("%s CLEANUP PurgeOlderVersions completed", time.Now())
// If size increased by 3.5 GB, then we run this 3 times.
numTimes := (currentVlogSize - lastVlogSize) / GB
for i := 0; i < int(numTimes); i++ {
err := b.db.RunValueLogGC(0.5)
if err != nil {
logrus.Errorf("%s badgerOps unable to RunValueLogGC; %s", time.Now(), err)
}
logrus.Infof("%s CLEANUP RunValueLogGC completed iteration=%d", time.Now(), i)
}
_, lastVlogSize = b.db.Size()
}
}
} |
Compaction wouldn't happen if it is not required. That means if you're not writing new things, and causing the levels to overflow, they won't automatically compact away. And therefore, the SST files won't have a chance to discard keys as they get expired. However, as keys are getting expired, value log GC would still churn away on them and cause value logs to be discarded/moved etc. Note that if you do close the DB, it would cause at least L0 to be fully compacted away. P.S. For code samples, instead of putting them here, can you put them in a gist? Thanks! Update: During my close to 3hrs run, with 90s TTL, the sst files were reduced to a few KBs at L1 (only 10 keys, with many versions), with the value logs hovering close to 100G. You can also see compaction skipping keys and versions via |
yea, I assumed that so I'm reworking the test now to do a continuous write at a slower pace to see if it maintains a consistent memory/disk profile. will have some data shortly, thanks again. |
- In response to #464 . - If a key version has deletion marker or is expired, we can safely drop all the older versions of that key. - If there's no overlap from lower levels, we can even drop the first such version. - To avoid an edge case bug, we need to ensure that all versions of the same key are contained by the same SSTable. This is reflected by not closing a table as long as more versions are found. And by picking key ranges to include all versions of the keys. Both these measures ensure that all versions at the same level get compacted together. - To avoid another edge case bug, we need to ensure that we don't drop versions above the current `readTs`. Note that managed DB would therefore not result in any version being discarded. Handling that is an open problem. Badger Info: - Print out the key ranges for each SSTable via `badger info`. - Open an available port when running `badger` tool, so one can view the `/debug/request` and `/debug/events` links to understand what's going on behind the scenes.
Submitted as e597fb7. Based on my testing, this PR works. Would be good to hear from your end as well, considering you're running tests for a lot longer. Btw, modified your code a bit to allow ctrl+c to be captured to close the DB gracefully, and a couple other changes. You can see my version here: https://gist.github.com/manishrjain/137b9849f2672a5fc3a106815d9d41d7 |
were there any changes in that commit from the previous branch /removekeys that I was testing against? I did a 12 hour soak test and the good news is that memory stayed pretty constant (I slowed writes down to give compaction a chance to keep up in my test) but the load and disk usage steadily climbed of that time disk usage steadily climbing I'll pull latest and do another long running test with your gist above. thx again Manish! edit: going to change the test so that it writes at full speed for 15 minutes, then slows down writes to observe space reclamation while writes are still coming in to kick off compactions. |
I think you could try running value log GC more often -- most likely it's falling behind in reclaiming space as fast as writes are coming in. |
using the latest from master is looking much better. You can clearly see where space was reclaimed. Test: Write as fast as you can for 15 minutes with 15 minute TTL'd keys (200K per second+) then slow down to 10-25K per second to give compaction time to catch up. it's on an uptrend right now, so I'm going to let it run for a while longer to see if we get to a steady state or if it will continue to fall behind. I'll then retest with a 30 second ticker for running valuelog GC |
Another way to figure out when to run value log GC is to do it every time the size of value log increases by a gig or so (in addition to running it periodically). You can use this function to get the sizes: Line 1108 in e1ead00
|
I'm hoping this is a configuration related issue but I've played around with the settings and I keep getting the same behavior. Tested on an i3.4XL in AWS, raid0 on the two SSD drives.
Expected behavior of the code below:
Actual behavior seen:
Please advise of what is wrong in the code below, thanks!
3HRs of runtime you can see just linear growth
https://imgur.com/a/2UUfIrG
UPDATE:
I've also tried with these settings and memory doesn't grow as fast but it linearly climbs until OOM as well and the same behavior as above
The text was updated successfully, but these errors were encountered: