From 78d099ed1ffce8e9fa4a65f92ae8d1fe6a0a08e8 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sun, 21 May 2017 10:16:20 -0700 Subject: [PATCH] Garbage collect pages allocated after minimum txid Read txns would lock pages allocated after the txn, keeping those pages off the free list until closing the read txn. Instead, track allocating txid to compute page lifetime, freeing pages if all txns between page allocation and page free are closed. --- db.go | 36 ++++++++++---- freelist.go | 121 +++++++++++++++++++++++++++++++++++++---------- freelist_test.go | 36 +++++++------- tx.go | 2 +- 4 files changed, 143 insertions(+), 52 deletions(-) diff --git a/db.go b/db.go index f352ff14f..17b14a5af 100644 --- a/db.go +++ b/db.go @@ -8,6 +8,7 @@ import ( "os" "runtime" "runtime/debug" + "sort" "strings" "sync" "time" @@ -526,21 +527,36 @@ func (db *DB) beginRWTx() (*Tx, error) { t := &Tx{writable: true} t.init(db) db.rwtx = t + db.freePages() + return t, nil +} - // Free any pages associated with closed read-only transactions. - var minid txid = 0xFFFFFFFFFFFFFFFF - for _, t := range db.txs { - if t.meta.txid < minid { - minid = t.meta.txid - } +// freePages releases any pages associated with closed read-only transactions. +func (db *DB) freePages() { + // Free all pending pages prior to earliest open transaction. + sort.Sort(txsById(db.txs)) + minid := txid(0xFFFFFFFFFFFFFFFF) + if len(db.txs) > 0 { + minid = db.txs[0].meta.txid } if minid > 0 { db.freelist.release(minid - 1) } - - return t, nil + // Release unused txid extents. + for _, t := range db.txs { + db.freelist.releaseRange(minid, t.meta.txid-1) + minid = t.meta.txid + 1 + } + db.freelist.releaseRange(minid, txid(0xFFFFFFFFFFFFFFFF)) + // Any page both allocated and freed in an extent is safe to release. } +type txsById []*Tx + +func (t txsById) Len() int { return len(t) } +func (t txsById) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t txsById) Less(i, j int) bool { return t[i].meta.txid < t[j].meta.txid } + // removeTx removes a transaction from the database. func (db *DB) removeTx(tx *Tx) { // Release the read lock on the mmap. @@ -826,7 +842,7 @@ func (db *DB) meta() *meta { } // allocate returns a contiguous block of memory starting at a given page. -func (db *DB) allocate(count int) (*page, error) { +func (db *DB) allocate(txid txid, count int) (*page, error) { // Allocate a temporary buffer for the page. var buf []byte if count == 1 { @@ -838,7 +854,7 @@ func (db *DB) allocate(count int) (*page, error) { p.overflow = uint32(count - 1) // Use pages from the freelist if they are available. - if p.id = db.freelist.allocate(count); p.id != 0 { + if p.id = db.freelist.allocate(txid, count); p.id != 0 { return p, nil } diff --git a/freelist.go b/freelist.go index aba48f58c..e0892a6c9 100644 --- a/freelist.go +++ b/freelist.go @@ -6,18 +6,29 @@ import ( "unsafe" ) + +// txPending holds a list of pgids and corresponding allocation txns +// that are pending to be freed. +type txPending struct { + ids []pgid + alloctx []txid // txids allocating the ids + lastReleaseBegin txid // beginning txid of last matching releaseRange +} + // freelist represents a list of all pages that are available for allocation. // It also tracks pages that have been freed but are still in use by open transactions. type freelist struct { - ids []pgid // all free and available free page ids. - pending map[txid][]pgid // mapping of soon-to-be free page ids by tx. - cache map[pgid]bool // fast lookup of all free and pending page ids. + ids []pgid // all free and available free page ids. + allocs map[pgid]txid // mapping of txid that allocated a pgid. + pending map[txid]*txPending // mapping of soon-to-be free page ids by tx. + cache map[pgid]bool // fast lookup of all free and pending page ids. } // newFreelist returns an empty, initialized freelist. func newFreelist() *freelist { return &freelist{ - pending: make(map[txid][]pgid), + allocs: make(map[pgid]txid), + pending: make(map[txid]*txPending), cache: make(map[pgid]bool), } } @@ -45,8 +56,8 @@ func (f *freelist) free_count() int { // pending_count returns count of pending pages func (f *freelist) pending_count() int { var count int - for _, list := range f.pending { - count += len(list) + for _, txp := range f.pending { + count += len(txp.ids) } return count } @@ -55,8 +66,8 @@ func (f *freelist) pending_count() int { // f.count returns the minimum length required for dst. func (f *freelist) copyall(dst []pgid) { m := make(pgids, 0, f.pending_count()) - for _, list := range f.pending { - m = append(m, list...) + for _, txp := range f.pending { + m = append(m, txp.ids...) } sort.Sort(m) mergepgids(dst, f.ids, m) @@ -64,7 +75,7 @@ func (f *freelist) copyall(dst []pgid) { // allocate returns the starting page id of a contiguous list of pages of a given size. // If a contiguous block cannot be found then 0 is returned. -func (f *freelist) allocate(n int) pgid { +func (f *freelist) allocate(txid txid, n int) pgid { if len(f.ids) == 0 { return 0 } @@ -97,7 +108,7 @@ func (f *freelist) allocate(n int) pgid { for i := pgid(0); i < pgid(n); i++ { delete(f.cache, initial+i) } - + f.allocs[initial] = txid return initial } @@ -114,28 +125,73 @@ func (f *freelist) free(txid txid, p *page) { } // Free page and all its overflow pages. - var ids = f.pending[txid] + txp := f.pending[txid] + if txp == nil { + txp = &txPending{} + f.pending[txid] = txp + } + allocTxid, ok := f.allocs[p.id] + if ok { + delete(f.allocs, p.id) + } else if (p.flags & (freelistPageFlag | metaPageFlag)) != 0 { + // Safe to claim txid as allocating since these types are private to txid. + allocTxid = txid + } + for id := p.id; id <= p.id+pgid(p.overflow); id++ { // Verify that page is not already free. if f.cache[id] { panic(fmt.Sprintf("page %d already freed", id)) } - // Add to the freelist and cache. - ids = append(ids, id) + txp.ids = append(txp.ids, id) + txp.alloctx = append(txp.alloctx, allocTxid) f.cache[id] = true } - f.pending[txid] = ids } // release moves all page ids for a transaction id (or older) to the freelist. func (f *freelist) release(txid txid) { m := make(pgids, 0) - for tid, ids := range f.pending { + for tid, txp := range f.pending { if tid <= txid { // Move transaction's pending pages to the available freelist. // Don't remove from the cache since the page is still free. - m = append(m, ids...) + m = append(m, txp.ids...) + delete(f.pending, tid) + } + } + sort.Sort(m) + f.ids = pgids(f.ids).merge(m) +} + +// releaseRange moves pending pages allocated within an extent [begin,end] to the free list. +func (f *freelist) releaseRange(begin, end txid) { + if begin > end { + return + } + var m pgids + for tid, txp := range f.pending { + if tid < begin || tid > end { + continue + } + // Don't recompute freed pages if ranges haven't updated. + if txp.lastReleaseBegin == begin { + continue + } + for i := 0; i < len(txp.ids); i++ { + if atx := txp.alloctx[i]; atx < begin || atx > end { + continue + } + m = append(m, txp.ids[i]) + txp.ids[i] = txp.ids[len(txp.ids)-1] + txp.ids = txp.ids[:len(txp.ids)-1] + txp.alloctx[i] = txp.alloctx[len(txp.alloctx)-1] + txp.alloctx = txp.alloctx[:len(txp.alloctx)-1] + i-- + } + txp.lastReleaseBegin = begin + if len(txp.ids) == 0 { delete(f.pending, tid) } } @@ -146,12 +202,29 @@ func (f *freelist) release(txid txid) { // rollback removes the pages from a given pending tx. func (f *freelist) rollback(txid txid) { // Remove page ids from cache. - for _, id := range f.pending[txid] { - delete(f.cache, id) + txp := f.pending[txid] + if txp == nil { + return } - - // Remove pages from pending list. + var m pgids + for i, pgid := range txp.ids { + delete(f.cache, pgid) + tx := txp.alloctx[i] + if tx == 0 { + continue + } + if tx != txid { + // Pending free aborted; restore page back to alloc list. + f.allocs[pgid] = tx + } else { + // Freed page was allocated by this txn; OK to throw away. + m = append(m, pgid) + } + } + // Remove pages from pending list and mark as free if allocated by txid. delete(f.pending, txid) + sort.Sort(m) + f.ids = pgids(f.ids).merge(m) } // freed returns whether a given page is in the free list. @@ -217,8 +290,8 @@ func (f *freelist) reload(p *page) { // Build a cache of only pending pages. pcache := make(map[pgid]bool) - for _, pendingIDs := range f.pending { - for _, pendingID := range pendingIDs { + for _, txp := range f.pending { + for _, pendingID := range txp.ids { pcache[pendingID] = true } } @@ -244,8 +317,8 @@ func (f *freelist) reindex() { for _, id := range f.ids { f.cache[id] = true } - for _, pendingIDs := range f.pending { - for _, pendingID := range pendingIDs { + for _, txp := range f.pending { + for _, pendingID := range txp.ids { f.cache[pendingID] = true } } diff --git a/freelist_test.go b/freelist_test.go index 4e9b3a8db..4259bedcb 100644 --- a/freelist_test.go +++ b/freelist_test.go @@ -12,7 +12,7 @@ import ( func TestFreelist_free(t *testing.T) { f := newFreelist() f.free(100, &page{id: 12}) - if !reflect.DeepEqual([]pgid{12}, f.pending[100]) { + if !reflect.DeepEqual([]pgid{12}, f.pending[100].ids) { t.Fatalf("exp=%v; got=%v", []pgid{12}, f.pending[100]) } } @@ -21,7 +21,7 @@ func TestFreelist_free(t *testing.T) { func TestFreelist_free_overflow(t *testing.T) { f := newFreelist() f.free(100, &page{id: 12, overflow: 3}) - if exp := []pgid{12, 13, 14, 15}; !reflect.DeepEqual(exp, f.pending[100]) { + if exp := []pgid{12, 13, 14, 15}; !reflect.DeepEqual(exp, f.pending[100].ids) { t.Fatalf("exp=%v; got=%v", exp, f.pending[100]) } } @@ -46,39 +46,40 @@ func TestFreelist_release(t *testing.T) { // Ensure that a freelist can find contiguous blocks of pages. func TestFreelist_allocate(t *testing.T) { - f := &freelist{ids: []pgid{3, 4, 5, 6, 7, 9, 12, 13, 18}} - if id := int(f.allocate(3)); id != 3 { + f := newFreelist() + f.ids = []pgid{3, 4, 5, 6, 7, 9, 12, 13, 18} + if id := int(f.allocate(1, 3)); id != 3 { t.Fatalf("exp=3; got=%v", id) } - if id := int(f.allocate(1)); id != 6 { + if id := int(f.allocate(1, 1)); id != 6 { t.Fatalf("exp=6; got=%v", id) } - if id := int(f.allocate(3)); id != 0 { + if id := int(f.allocate(1, 3)); id != 0 { t.Fatalf("exp=0; got=%v", id) } - if id := int(f.allocate(2)); id != 12 { + if id := int(f.allocate(1, 2)); id != 12 { t.Fatalf("exp=12; got=%v", id) } - if id := int(f.allocate(1)); id != 7 { + if id := int(f.allocate(1, 1)); id != 7 { t.Fatalf("exp=7; got=%v", id) } - if id := int(f.allocate(0)); id != 0 { + if id := int(f.allocate(1, 0)); id != 0 { t.Fatalf("exp=0; got=%v", id) } - if id := int(f.allocate(0)); id != 0 { + if id := int(f.allocate(1, 0)); id != 0 { t.Fatalf("exp=0; got=%v", id) } if exp := []pgid{9, 18}; !reflect.DeepEqual(exp, f.ids) { t.Fatalf("exp=%v; got=%v", exp, f.ids) } - if id := int(f.allocate(1)); id != 9 { + if id := int(f.allocate(1, 1)); id != 9 { t.Fatalf("exp=9; got=%v", id) } - if id := int(f.allocate(1)); id != 18 { + if id := int(f.allocate(1, 1)); id != 18 { t.Fatalf("exp=18; got=%v", id) } - if id := int(f.allocate(1)); id != 0 { + if id := int(f.allocate(1, 1)); id != 0 { t.Fatalf("exp=0; got=%v", id) } if exp := []pgid{}; !reflect.DeepEqual(exp, f.ids) { @@ -113,9 +114,9 @@ func TestFreelist_read(t *testing.T) { func TestFreelist_write(t *testing.T) { // Create a freelist and write it to a page. var buf [4096]byte - f := &freelist{ids: []pgid{12, 39}, pending: make(map[txid][]pgid)} - f.pending[100] = []pgid{28, 11} - f.pending[101] = []pgid{3} + f := &freelist{ids: []pgid{12, 39}, pending: make(map[txid]*txPending)} + f.pending[100] = &txPending{ids: []pgid{28, 11}} + f.pending[101] = &txPending{ids: []pgid{3}} p := (*page)(unsafe.Pointer(&buf[0])) if err := f.write(p); err != nil { t.Fatal(err) @@ -142,7 +143,8 @@ func benchmark_FreelistRelease(b *testing.B, size int) { pending := randomPgids(len(ids) / 400) b.ResetTimer() for i := 0; i < b.N; i++ { - f := &freelist{ids: ids, pending: map[txid][]pgid{1: pending}} + txp := &txPending{ids: pending} + f := &freelist{ids: ids, pending: map[txid]*txPending{1: txp}} f.release(1) } } diff --git a/tx.go b/tx.go index 6700308a2..f8cae6b18 100644 --- a/tx.go +++ b/tx.go @@ -453,7 +453,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo // allocate returns a contiguous block of memory starting at a given page. func (tx *Tx) allocate(count int) (*page, error) { - p, err := tx.db.allocate(count) + p, err := tx.db.allocate(tx.meta.txid, count) if err != nil { return nil, err }