Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Garbage collect pages allocated after minimum txid #3

Merged
merged 1 commit into from
Jun 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"runtime"
"runtime/debug"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -526,21 +527,36 @@ func (db *DB) beginRWTx() (*Tx, error) {
t := &Tx{writable: true}
t.init(db)
db.rwtx = t
db.freePages()
return t, nil
}

// Free any pages associated with closed read-only transactions.
var minid txid = 0xFFFFFFFFFFFFFFFF
for _, t := range db.txs {
if t.meta.txid < minid {
minid = t.meta.txid
}
// freePages releases any pages associated with closed read-only transactions.
func (db *DB) freePages() {
// Free all pending pages prior to earliest open transaction.
sort.Sort(txsById(db.txs))
minid := txid(0xFFFFFFFFFFFFFFFF)
if len(db.txs) > 0 {
minid = db.txs[0].meta.txid
}
if minid > 0 {
db.freelist.release(minid - 1)
}

return t, nil
// Release unused txid extents.
for _, t := range db.txs {
db.freelist.releaseRange(minid, t.meta.txid-1)
minid = t.meta.txid + 1
}
db.freelist.releaseRange(minid, txid(0xFFFFFFFFFFFFFFFF))
// Any page both allocated and freed in an extent is safe to release.
}

type txsById []*Tx

func (t txsById) Len() int { return len(t) }
func (t txsById) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t txsById) Less(i, j int) bool { return t[i].meta.txid < t[j].meta.txid }

// removeTx removes a transaction from the database.
func (db *DB) removeTx(tx *Tx) {
// Release the read lock on the mmap.
Expand Down Expand Up @@ -826,7 +842,7 @@ func (db *DB) meta() *meta {
}

// allocate returns a contiguous block of memory starting at a given page.
func (db *DB) allocate(count int) (*page, error) {
func (db *DB) allocate(txid txid, count int) (*page, error) {
// Allocate a temporary buffer for the page.
var buf []byte
if count == 1 {
Expand All @@ -838,7 +854,7 @@ func (db *DB) allocate(count int) (*page, error) {
p.overflow = uint32(count - 1)

// Use pages from the freelist if they are available.
if p.id = db.freelist.allocate(count); p.id != 0 {
if p.id = db.freelist.allocate(txid, count); p.id != 0 {
return p, nil
}

Expand Down
121 changes: 97 additions & 24 deletions freelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,29 @@ import (
"unsafe"
)


// txPending holds a list of pgids and corresponding allocation txns
// that are pending to be freed.
type txPending struct {
ids []pgid
alloctx []txid // txids allocating the ids
lastReleaseBegin txid // beginning txid of last matching releaseRange
}

// freelist represents a list of all pages that are available for allocation.
// It also tracks pages that have been freed but are still in use by open transactions.
type freelist struct {
ids []pgid // all free and available free page ids.
pending map[txid][]pgid // mapping of soon-to-be free page ids by tx.
cache map[pgid]bool // fast lookup of all free and pending page ids.
ids []pgid // all free and available free page ids.
allocs map[pgid]txid // mapping of txid that allocated a pgid.
pending map[txid]*txPending // mapping of soon-to-be free page ids by tx.
cache map[pgid]bool // fast lookup of all free and pending page ids.
}

// newFreelist returns an empty, initialized freelist.
func newFreelist() *freelist {
return &freelist{
pending: make(map[txid][]pgid),
allocs: make(map[pgid]txid),
pending: make(map[txid]*txPending),
cache: make(map[pgid]bool),
}
}
Expand Down Expand Up @@ -45,8 +56,8 @@ func (f *freelist) free_count() int {
// pending_count returns count of pending pages
func (f *freelist) pending_count() int {
var count int
for _, list := range f.pending {
count += len(list)
for _, txp := range f.pending {
count += len(txp.ids)
}
return count
}
Expand All @@ -55,16 +66,16 @@ func (f *freelist) pending_count() int {
// f.count returns the minimum length required for dst.
func (f *freelist) copyall(dst []pgid) {
m := make(pgids, 0, f.pending_count())
for _, list := range f.pending {
m = append(m, list...)
for _, txp := range f.pending {
m = append(m, txp.ids...)
}
sort.Sort(m)
mergepgids(dst, f.ids, m)
}

// allocate returns the starting page id of a contiguous list of pages of a given size.
// If a contiguous block cannot be found then 0 is returned.
func (f *freelist) allocate(n int) pgid {
func (f *freelist) allocate(txid txid, n int) pgid {
if len(f.ids) == 0 {
return 0
}
Expand Down Expand Up @@ -97,7 +108,7 @@ func (f *freelist) allocate(n int) pgid {
for i := pgid(0); i < pgid(n); i++ {
delete(f.cache, initial+i)
}

f.allocs[initial] = txid
return initial
}

Expand All @@ -114,28 +125,73 @@ func (f *freelist) free(txid txid, p *page) {
}

// Free page and all its overflow pages.
var ids = f.pending[txid]
txp := f.pending[txid]
if txp == nil {
txp = &txPending{}
f.pending[txid] = txp
}
allocTxid, ok := f.allocs[p.id]
if ok {
delete(f.allocs, p.id)
} else if (p.flags & (freelistPageFlag | metaPageFlag)) != 0 {
// Safe to claim txid as allocating since these types are private to txid.
allocTxid = txid
}

for id := p.id; id <= p.id+pgid(p.overflow); id++ {
// Verify that page is not already free.
if f.cache[id] {
panic(fmt.Sprintf("page %d already freed", id))
}

// Add to the freelist and cache.
ids = append(ids, id)
txp.ids = append(txp.ids, id)
txp.alloctx = append(txp.alloctx, allocTxid)
f.cache[id] = true
}
f.pending[txid] = ids
}

// release moves all page ids for a transaction id (or older) to the freelist.
func (f *freelist) release(txid txid) {
m := make(pgids, 0)
for tid, ids := range f.pending {
for tid, txp := range f.pending {
if tid <= txid {
// Move transaction's pending pages to the available freelist.
// Don't remove from the cache since the page is still free.
m = append(m, ids...)
m = append(m, txp.ids...)
delete(f.pending, tid)
}
}
sort.Sort(m)
f.ids = pgids(f.ids).merge(m)
}

// releaseRange moves pending pages allocated within an extent [begin,end] to the free list.
func (f *freelist) releaseRange(begin, end txid) {
if begin > end {
return
}
var m pgids
for tid, txp := range f.pending {
if tid < begin || tid > end {
continue
}
// Don't recompute freed pages if ranges haven't updated.
if txp.lastReleaseBegin == begin {
continue
}
for i := 0; i < len(txp.ids); i++ {
if atx := txp.alloctx[i]; atx < begin || atx > end {
continue
}
m = append(m, txp.ids[i])
txp.ids[i] = txp.ids[len(txp.ids)-1]
txp.ids = txp.ids[:len(txp.ids)-1]
txp.alloctx[i] = txp.alloctx[len(txp.alloctx)-1]
txp.alloctx = txp.alloctx[:len(txp.alloctx)-1]
i--
}
txp.lastReleaseBegin = begin
if len(txp.ids) == 0 {
delete(f.pending, tid)
}
}
Expand All @@ -146,12 +202,29 @@ func (f *freelist) release(txid txid) {
// rollback removes the pages from a given pending tx.
func (f *freelist) rollback(txid txid) {
// Remove page ids from cache.
for _, id := range f.pending[txid] {
delete(f.cache, id)
txp := f.pending[txid]
if txp == nil {
return
}

// Remove pages from pending list.
var m pgids
for i, pgid := range txp.ids {
delete(f.cache, pgid)
tx := txp.alloctx[i]
if tx == 0 {
continue
}
if tx != txid {
// Pending free aborted; restore page back to alloc list.
f.allocs[pgid] = tx
} else {
// Freed page was allocated by this txn; OK to throw away.
m = append(m, pgid)
}
}
// Remove pages from pending list and mark as free if allocated by txid.
delete(f.pending, txid)
sort.Sort(m)
f.ids = pgids(f.ids).merge(m)
}

// freed returns whether a given page is in the free list.
Expand Down Expand Up @@ -217,8 +290,8 @@ func (f *freelist) reload(p *page) {

// Build a cache of only pending pages.
pcache := make(map[pgid]bool)
for _, pendingIDs := range f.pending {
for _, pendingID := range pendingIDs {
for _, txp := range f.pending {
for _, pendingID := range txp.ids {
pcache[pendingID] = true
}
}
Expand All @@ -244,8 +317,8 @@ func (f *freelist) reindex() {
for _, id := range f.ids {
f.cache[id] = true
}
for _, pendingIDs := range f.pending {
for _, pendingID := range pendingIDs {
for _, txp := range f.pending {
for _, pendingID := range txp.ids {
f.cache[pendingID] = true
}
}
Expand Down
36 changes: 19 additions & 17 deletions freelist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestFreelist_free(t *testing.T) {
f := newFreelist()
f.free(100, &page{id: 12})
if !reflect.DeepEqual([]pgid{12}, f.pending[100]) {
if !reflect.DeepEqual([]pgid{12}, f.pending[100].ids) {
t.Fatalf("exp=%v; got=%v", []pgid{12}, f.pending[100])
}
}
Expand All @@ -21,7 +21,7 @@ func TestFreelist_free(t *testing.T) {
func TestFreelist_free_overflow(t *testing.T) {
f := newFreelist()
f.free(100, &page{id: 12, overflow: 3})
if exp := []pgid{12, 13, 14, 15}; !reflect.DeepEqual(exp, f.pending[100]) {
if exp := []pgid{12, 13, 14, 15}; !reflect.DeepEqual(exp, f.pending[100].ids) {
t.Fatalf("exp=%v; got=%v", exp, f.pending[100])
}
}
Expand All @@ -46,39 +46,40 @@ func TestFreelist_release(t *testing.T) {

// Ensure that a freelist can find contiguous blocks of pages.
func TestFreelist_allocate(t *testing.T) {
f := &freelist{ids: []pgid{3, 4, 5, 6, 7, 9, 12, 13, 18}}
if id := int(f.allocate(3)); id != 3 {
f := newFreelist()
f.ids = []pgid{3, 4, 5, 6, 7, 9, 12, 13, 18}
if id := int(f.allocate(1, 3)); id != 3 {
t.Fatalf("exp=3; got=%v", id)
}
if id := int(f.allocate(1)); id != 6 {
if id := int(f.allocate(1, 1)); id != 6 {
t.Fatalf("exp=6; got=%v", id)
}
if id := int(f.allocate(3)); id != 0 {
if id := int(f.allocate(1, 3)); id != 0 {
t.Fatalf("exp=0; got=%v", id)
}
if id := int(f.allocate(2)); id != 12 {
if id := int(f.allocate(1, 2)); id != 12 {
t.Fatalf("exp=12; got=%v", id)
}
if id := int(f.allocate(1)); id != 7 {
if id := int(f.allocate(1, 1)); id != 7 {
t.Fatalf("exp=7; got=%v", id)
}
if id := int(f.allocate(0)); id != 0 {
if id := int(f.allocate(1, 0)); id != 0 {
t.Fatalf("exp=0; got=%v", id)
}
if id := int(f.allocate(0)); id != 0 {
if id := int(f.allocate(1, 0)); id != 0 {
t.Fatalf("exp=0; got=%v", id)
}
if exp := []pgid{9, 18}; !reflect.DeepEqual(exp, f.ids) {
t.Fatalf("exp=%v; got=%v", exp, f.ids)
}

if id := int(f.allocate(1)); id != 9 {
if id := int(f.allocate(1, 1)); id != 9 {
t.Fatalf("exp=9; got=%v", id)
}
if id := int(f.allocate(1)); id != 18 {
if id := int(f.allocate(1, 1)); id != 18 {
t.Fatalf("exp=18; got=%v", id)
}
if id := int(f.allocate(1)); id != 0 {
if id := int(f.allocate(1, 1)); id != 0 {
t.Fatalf("exp=0; got=%v", id)
}
if exp := []pgid{}; !reflect.DeepEqual(exp, f.ids) {
Expand Down Expand Up @@ -113,9 +114,9 @@ func TestFreelist_read(t *testing.T) {
func TestFreelist_write(t *testing.T) {
// Create a freelist and write it to a page.
var buf [4096]byte
f := &freelist{ids: []pgid{12, 39}, pending: make(map[txid][]pgid)}
f.pending[100] = []pgid{28, 11}
f.pending[101] = []pgid{3}
f := &freelist{ids: []pgid{12, 39}, pending: make(map[txid]*txPending)}
f.pending[100] = &txPending{ids: []pgid{28, 11}}
f.pending[101] = &txPending{ids: []pgid{3}}
p := (*page)(unsafe.Pointer(&buf[0]))
if err := f.write(p); err != nil {
t.Fatal(err)
Expand All @@ -142,7 +143,8 @@ func benchmark_FreelistRelease(b *testing.B, size int) {
pending := randomPgids(len(ids) / 400)
b.ResetTimer()
for i := 0; i < b.N; i++ {
f := &freelist{ids: ids, pending: map[txid][]pgid{1: pending}}
txp := &txPending{ids: pending}
f := &freelist{ids: ids, pending: map[txid]*txPending{1: txp}}
f.release(1)
}
}
Expand Down
2 changes: 1 addition & 1 deletion tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo

// allocate returns a contiguous block of memory starting at a given page.
func (tx *Tx) allocate(count int) (*page, error) {
p, err := tx.db.allocate(count)
p, err := tx.db.allocate(tx.meta.txid, count)
if err != nil {
return nil, err
}
Expand Down