Skip to content

Commit

Permalink
Merge pull request #4015 from bhandras/kv-etcd
Browse files Browse the repository at this point in the history
kvdb wrapper for etcd
  • Loading branch information
bhandras committed May 22, 2020
2 parents a8d5c32 + b53475d commit 7f1a450
Show file tree
Hide file tree
Showing 35 changed files with 3,975 additions and 97 deletions.
11 changes: 10 additions & 1 deletion channeldb/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,22 @@ func makeTestDB() (*DB, func(), error) {
}

// Next, create channeldb for the first time.
cdb, err := Open(tempDirName, OptionClock(testClock))
backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
if err != nil {
backendCleanup()
return nil, nil, err
}

cdb, err := CreateWithBackend(backend, OptionClock(testClock))
if err != nil {
backendCleanup()
os.RemoveAll(tempDirName)
return nil, nil, err
}

cleanUp := func() {
cdb.Close()
backendCleanup()
os.RemoveAll(tempDirName)
}

Expand Down
122 changes: 78 additions & 44 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"time"

"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/channeldb/migration12"
Expand Down Expand Up @@ -148,38 +148,79 @@ var (
// schedules, and reputation data.
type DB struct {
kvdb.Backend

dbPath string
graph *ChannelGraph
clock clock.Clock
dryRun bool
}

// Open opens an existing channeldb. Any necessary schemas migrations due to
// updates will take place as necessary.
func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
path := filepath.Join(dbPath, dbName)
// Update is a wrapper around walletdb.Update which calls into the extended
// backend when available. This call is needed to be able to cast DB to
// ExtendedBackend.
func (db *DB) Update(f func(tx walletdb.ReadWriteTx) error) error {
if v, ok := db.Backend.(kvdb.ExtendedBackend); ok {
return v.Update(f)
}
return walletdb.Update(db, f)
}

if !fileExists(path) {
if err := createChannelDB(dbPath); err != nil {
return nil, err
}
// View is a wrapper around walletdb.View which calls into the extended
// backend when available. This call is needed to be able to cast DB to
// ExtendedBackend.
func (db *DB) View(f func(tx walletdb.ReadTx) error) error {
if v, ok := db.Backend.(kvdb.ExtendedBackend); ok {
return v.View(f)
}

return walletdb.View(db, f)
}

// PrintStats calls into the extended backend if available. This call is needed
// to be able to cast DB to ExtendedBackend.
func (db *DB) PrintStats() string {
if v, ok := db.Backend.(kvdb.ExtendedBackend); ok {
return v.PrintStats()
}

return "unimplemented"
}

// Open opens or creates channeldb. Any necessary schemas migrations due
// to updates will take place as necessary.
// TODO(bhandras): deprecate this function.
func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
opts := DefaultOptions()
for _, modifier := range modifiers {
modifier(&opts)
}

// Specify bbolt freelist options to reduce heap pressure in case the
// freelist grows to be very large.
bdb, err := kvdb.Open(kvdb.BoltBackendName, path, opts.NoFreelistSync)
backend, err := kvdb.GetBoltBackend(dbPath, dbName, opts.NoFreelistSync)
if err != nil {
return nil, err
}

db, err := CreateWithBackend(backend, modifiers...)
if err == nil {
db.dbPath = dbPath
}
return db, err
}

// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
// Any necessary schemas migrations due to updates will take place as necessary.
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, error) {
if err := initChannelDB(backend); err != nil {
return nil, err
}

opts := DefaultOptions()
for _, modifier := range modifiers {
modifier(&opts)
}

chanDB := &DB{
Backend: bdb,
dbPath: dbPath,
Backend: backend,
clock: opts.clock,
dryRun: opts.dryRun,
}
Expand All @@ -189,7 +230,7 @@ func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {

// Synchronize the version of database and apply migrations if needed.
if err := chanDB.syncVersions(dbVersions); err != nil {
bdb.Close()
backend.Close()
return nil, err
}

Expand Down Expand Up @@ -251,20 +292,15 @@ func (d *DB) Wipe() error {
// the case that the target path has not yet been created or doesn't yet exist,
// then the path is created. Additionally, all required top-level buckets used
// within the database are created.
func createChannelDB(dbPath string) error {
if !fileExists(dbPath) {
if err := os.MkdirAll(dbPath, 0700); err != nil {
return err
func initChannelDB(db kvdb.Backend) error {
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
meta := &Meta{}
// Check if DB is already initialized.
err := fetchMeta(meta, tx)
if err == nil {
return nil
}
}

path := filepath.Join(dbPath, dbName)
bdb, err := kvdb.Create(kvdb.BoltBackendName, path, true)
if err != nil {
return err
}

err = kvdb.Update(bdb, func(tx kvdb.RwTx) error {
if _, err := tx.CreateTopLevelBucket(openChannelBucket); err != nil {
return err
}
Expand Down Expand Up @@ -331,16 +367,14 @@ func createChannelDB(dbPath string) error {
return err
}

meta := &Meta{
DbVersionNumber: getLatestDBVersion(dbVersions),
}
meta.DbVersionNumber = getLatestDBVersion(dbVersions)
return putMeta(meta, tx)
})
if err != nil {
return fmt.Errorf("unable to create new channeldb")
return fmt.Errorf("unable to create new channeldb: %v", err)
}

return bdb.Close()
return nil
}

// fileExists returns true if the file exists, and false otherwise.
Expand Down Expand Up @@ -373,7 +407,7 @@ func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error)
// stored currently active/open channels associated with the target nodeID. In
// the case that no active channels are known to have been created with this
// node, then a zero-length slice is returned.
func (d *DB) fetchOpenChannels(tx kvdb.ReadTx,
func (db *DB) fetchOpenChannels(tx kvdb.ReadTx,
nodeID *btcec.PublicKey) ([]*OpenChannel, error) {

// Get the bucket dedicated to storing the metadata for open channels.
Expand Down Expand Up @@ -409,7 +443,7 @@ func (d *DB) fetchOpenChannels(tx kvdb.ReadTx,

// Finally, we both of the necessary buckets retrieved, fetch
// all the active channels related to this node.
nodeChannels, err := d.fetchNodeChannels(chainBucket)
nodeChannels, err := db.fetchNodeChannels(chainBucket)
if err != nil {
return fmt.Errorf("unable to read channel for "+
"chain_hash=%x, node_key=%x: %v",
Expand All @@ -426,7 +460,7 @@ func (d *DB) fetchOpenChannels(tx kvdb.ReadTx,
// fetchNodeChannels retrieves all active channels from the target chainBucket
// which is under a node's dedicated channel bucket. This function is typically
// used to fetch all the active channels related to a particular node.
func (d *DB) fetchNodeChannels(chainBucket kvdb.ReadBucket) ([]*OpenChannel, error) {
func (db *DB) fetchNodeChannels(chainBucket kvdb.ReadBucket) ([]*OpenChannel, error) {

var channels []*OpenChannel

Expand All @@ -452,7 +486,7 @@ func (d *DB) fetchNodeChannels(chainBucket kvdb.ReadBucket) ([]*OpenChannel, err
return fmt.Errorf("unable to read channel data for "+
"chan_point=%v: %v", outPoint, err)
}
oChannel.Db = d
oChannel.Db = db

channels = append(channels, oChannel)

Expand Down Expand Up @@ -906,8 +940,8 @@ func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
// pruneLinkNode determines whether we should garbage collect a link node from
// the database due to no longer having any open channels with it. If there are
// any left, then this acts as a no-op.
func (d *DB) pruneLinkNode(tx kvdb.RwTx, remotePub *btcec.PublicKey) error {
openChannels, err := d.fetchOpenChannels(tx, remotePub)
func (db *DB) pruneLinkNode(tx kvdb.RwTx, remotePub *btcec.PublicKey) error {
openChannels, err := db.fetchOpenChannels(tx, remotePub)
if err != nil {
return fmt.Errorf("unable to fetch open channels for peer %x: "+
"%v", remotePub.SerializeCompressed(), err)
Expand All @@ -920,7 +954,7 @@ func (d *DB) pruneLinkNode(tx kvdb.RwTx, remotePub *btcec.PublicKey) error {
log.Infof("Pruning link node %x with zero open channels from database",
remotePub.SerializeCompressed())

return d.deleteLinkNode(tx, remotePub)
return db.deleteLinkNode(tx, remotePub)
}

// PruneLinkNodes attempts to prune all link nodes found within the databse with
Expand Down Expand Up @@ -1132,16 +1166,16 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) {
// database. If the channel was already removed (has a closed channel entry),
// then we'll return a nil error. Otherwise, we'll insert a new close summary
// into the database.
func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
func (db *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
// With the chanPoint constructed, we'll attempt to find the target
// channel in the database. If we can't find the channel, then we'll
// return the error back to the caller.
dbChan, err := d.FetchChannel(*chanPoint)
dbChan, err := db.FetchChannel(*chanPoint)
switch {
// If the channel wasn't found, then it's possible that it was already
// abandoned from the database.
case err == ErrChannelNotFound:
_, closedErr := d.FetchClosedChannel(chanPoint)
_, closedErr := db.FetchClosedChannel(chanPoint)
if closedErr != nil {
return closedErr
}
Expand Down Expand Up @@ -1304,9 +1338,9 @@ func fetchHistoricalChanBucket(tx kvdb.ReadTx,

// FetchHistoricalChannel fetches open channel data from the historical channel
// bucket.
func (d *DB) FetchHistoricalChannel(outPoint *wire.OutPoint) (*OpenChannel, error) {
func (db *DB) FetchHistoricalChannel(outPoint *wire.OutPoint) (*OpenChannel, error) {
var channel *OpenChannel
err := kvdb.View(d, func(tx kvdb.ReadTx) error {
err := kvdb.View(db, func(tx kvdb.ReadTx) error {
chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
if err != nil {
return err
Expand Down
17 changes: 15 additions & 2 deletions channeldb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/shachain"
Expand All @@ -33,7 +34,13 @@ func TestOpenWithCreate(t *testing.T) {

// Next, open thereby creating channeldb for the first time.
dbPath := filepath.Join(tempDirName, "cdb")
cdb, err := Open(dbPath)
backend, cleanup, err := kvdb.GetTestBackend(dbPath, "cdb")
if err != nil {
t.Fatalf("unable to get test db backend: %v", err)
}
defer cleanup()

cdb, err := CreateWithBackend(backend)
if err != nil {
t.Fatalf("unable to create channeldb: %v", err)
}
Expand Down Expand Up @@ -73,7 +80,13 @@ func TestWipe(t *testing.T) {

// Next, open thereby creating channeldb for the first time.
dbPath := filepath.Join(tempDirName, "cdb")
cdb, err := Open(dbPath)
backend, cleanup, err := kvdb.GetTestBackend(dbPath, "cdb")
if err != nil {
t.Fatalf("unable to get test db backend: %v", err)
}
defer cleanup()

cdb, err := CreateWithBackend(backend)
if err != nil {
t.Fatalf("unable to create channeldb: %v", err)
}
Expand Down
70 changes: 70 additions & 0 deletions channeldb/kvdb/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kvdb

import (
"fmt"
"os"
"path/filepath"

_ "github.com/btcsuite/btcwallet/walletdb/bdb" // Import to register backend.
)

// fileExists returns true if the file exists, and false otherwise.
func fileExists(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
}

return true
}

// GetBoltBackend opens (or creates if doesn't exits) a bbolt
// backed database and returns a kvdb.Backend wrapping it.
func GetBoltBackend(path, name string, noFreeListSync bool) (Backend, error) {
dbFilePath := filepath.Join(path, name)
var (
db Backend
err error
)

if !fileExists(dbFilePath) {
if !fileExists(path) {
if err := os.MkdirAll(path, 0700); err != nil {
return nil, err
}
}

db, err = Create(BoltBackendName, dbFilePath, noFreeListSync)
} else {
db, err = Open(BoltBackendName, dbFilePath, noFreeListSync)
}

if err != nil {
return nil, err
}

return db, nil
}

// GetTestBackend opens (or creates if doesn't exist) a bbolt or etcd
// backed database (for testing), and returns a kvdb.Backend and a cleanup
// func. Whether to create/open bbolt or embedded etcd database is based
// on the TestBackend constant which is conditionally compiled with build tag.
// The passed path is used to hold all db files, while the name is only used
// for bbolt.
func GetTestBackend(path, name string) (Backend, func(), error) {
empty := func() {}

if TestBackend == BoltBackendName {
db, err := GetBoltBackend(path, name, true)
if err != nil {
return nil, nil, err
}
return db, empty, nil
} else if TestBackend == EtcdBackendName {
return GetEtcdTestBackend(path, name)
}

return nil, nil, fmt.Errorf("unknown backend")
}
10 changes: 0 additions & 10 deletions channeldb/kvdb/bbolt.go

This file was deleted.

Loading

0 comments on commit 7f1a450

Please sign in to comment.