Skip to content

Commit

Permalink
state store upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Feb 18, 2022
1 parent 1f095de commit 0cfe94e
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 66 deletions.
8 changes: 0 additions & 8 deletions client/dynamicplugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ type RegistryState struct {
Plugins map[string]map[string]*list.List
}

// TODO(tgross): LegacyRegistryState is the v1 state persisted in the
// client state store. We need to do a data migration in the boltdb
// store on restart.
//
// type LegacyRegistryState struct {
// Plugins map[string]map[string]*PluginInfo
// }

type PluginDispenser func(info *PluginInfo) (interface{}, error)

// NewRegistry takes a map of `plugintype` to PluginDispenser functions
Expand Down
9 changes: 9 additions & 0 deletions client/state/12types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package state

import "github.com/hashicorp/nomad/client/dynamicplugins"

// RegistryState12 is the dynamic plugin registry state persisted
// before 1.3.0.
type RegistryState12 struct {
Plugins map[string]map[string]*dynamicplugins.PluginInfo
}
27 changes: 18 additions & 9 deletions client/state/state_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
// metaVersion is the value of the state schema version to detect when
// an upgrade is needed. It skips the usual boltdd/msgpack backend to
// be as portable and futureproof as possible.
metaVersion = []byte{'2'}
metaVersion = []byte{'3'}

// metaUpgradedKey is the key that stores the timestamp of the last
// time the schema was upgraded.
Expand Down Expand Up @@ -90,9 +90,9 @@ var (
// stored at
managerPluginStateKey = []byte("plugin_state")

// dynamicPluginBucket is the bucket name containing all dynamic plugin
// dynamicPluginBucketName is the bucket name containing all dynamic plugin
// registry data. each dynamic plugin registry will have its own subbucket.
dynamicPluginBucket = []byte("dynamicplugins")
dynamicPluginBucketName = []byte("dynamicplugins")

// registryStateKey is the key at which dynamic plugin registry state is stored
registryStateKey = []byte("registry_state")
Expand Down Expand Up @@ -677,7 +677,7 @@ func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) {
func (s *BoltStateDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error {
return s.db.Update(func(tx *boltdd.Tx) error {
// Retrieve the root dynamic plugin manager bucket
dynamicBkt, err := tx.CreateBucketIfNotExists(dynamicPluginBucket)
dynamicBkt, err := tx.CreateBucketIfNotExists(dynamicPluginBucketName)
if err != nil {
return err
}
Expand All @@ -691,7 +691,7 @@ func (s *BoltStateDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryS
var ps *dynamicplugins.RegistryState

err := s.db.View(func(tx *boltdd.Tx) error {
dynamicBkt := tx.Bucket(dynamicPluginBucket)
dynamicBkt := tx.Bucket(dynamicPluginBucketName)
if dynamicBkt == nil {
// No state, return
return nil
Expand Down Expand Up @@ -742,11 +742,11 @@ func (s *BoltStateDB) updateWithOptions(opts []WriteOption, updateFn func(tx *bo
// 0.9 schema. Creates a backup before upgrading.
func (s *BoltStateDB) Upgrade() error {
// Check to see if the underlying DB needs upgrading.
upgrade, err := NeedsUpgrade(s.db.BoltDB())
upgrade09, upgrade13, err := NeedsUpgrade(s.db.BoltDB())
if err != nil {
return err
}
if !upgrade {
if !upgrade09 && !upgrade13 {
// No upgrade needed!
return nil
}
Expand All @@ -759,8 +759,16 @@ func (s *BoltStateDB) Upgrade() error {

// Perform the upgrade
if err := s.db.Update(func(tx *boltdd.Tx) error {
if err := UpgradeAllocs(s.logger, tx); err != nil {
return err

if upgrade09 {
if err := UpgradeAllocs(s.logger, tx); err != nil {
return err
}
}
if upgrade13 {
if err := UpgradeDynamicPluginRegistry(s.logger, tx); err != nil {
return err
}
}

// Add standard metadata
Expand All @@ -773,6 +781,7 @@ func (s *BoltStateDB) Upgrade() error {
if err != nil {
return err
}

return bkt.Put(metaUpgradedKey, time.Now().Format(time.RFC3339))
}); err != nil {
return err
Expand Down
Binary file added client/state/testdata/state-1.2.6.db.gz
Binary file not shown.
60 changes: 48 additions & 12 deletions client/state/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@ package state

import (
"bytes"
"container/list"
"fmt"
"os"

"github.com/boltdb/bolt"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/helper/boltdd"
"github.com/hashicorp/nomad/nomad/structs"
)

// NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is
// already up to date.
func NeedsUpgrade(bdb *bolt.DB) (bool, error) {
needsUpgrade := true
err := bdb.View(func(tx *bolt.Tx) error {
func NeedsUpgrade(bdb *bolt.DB) (upgradeTo09, upgradeTo13 bool, err error) {
upgradeTo09 = true
upgradeTo13 = true
err = bdb.View(func(tx *bolt.Tx) error {
b := tx.Bucket(metaBucketName)
if b == nil {
// No meta bucket; upgrade
Expand All @@ -29,18 +32,23 @@ func NeedsUpgrade(bdb *bolt.DB) (bool, error) {
return nil
}

if !bytes.Equal(v, metaVersion) {
// Version exists but does not match. Abort.
return fmt.Errorf("incompatible state version. expected %q but found %q",
metaVersion, v)
if bytes.Equal(v, []byte{'2'}) {
upgradeTo09 = false
return nil
}
if bytes.Equal(v, metaVersion) {
upgradeTo09 = false
upgradeTo13 = false
return nil
}

// Version matches! Assume migrated!
needsUpgrade = false
return nil
// Version exists but does not match. Abort.
return fmt.Errorf("incompatible state version. expected %q but found %q",
metaVersion, v)

})

return needsUpgrade, err
return
}

// addMeta adds version metadata to BoltDB to mark it as upgraded and
Expand All @@ -51,7 +59,6 @@ func addMeta(tx *bolt.Tx) error {
if err != nil {
return err
}

return bkt.Put(metaVersionKey, metaVersion)
}

Expand Down Expand Up @@ -312,3 +319,32 @@ func upgradeOldAllocMutable(tx *boltdd.Tx, allocID string, oldBytes []byte) erro

return nil
}

func UpgradeDynamicPluginRegistry(logger hclog.Logger, tx *boltdd.Tx) error {

dynamicBkt := tx.Bucket(dynamicPluginBucketName)
if dynamicBkt == nil {
return nil // no previous plugins upgrade
}

oldState := &RegistryState12{}
if err := dynamicBkt.Get(registryStateKey, oldState); err != nil {
if !boltdd.IsErrNotFound(err) {
return fmt.Errorf("failed to read dynamic plugin registry state: %v", err)
}
}

newState := &dynamicplugins.RegistryState{
Plugins: make(map[string]map[string]*list.List),
}

for ptype, plugins := range oldState.Plugins {
newState.Plugins[ptype] = make(map[string]*list.List)
for pname, pluginInfo := range plugins {
newState.Plugins[ptype][pname] = list.New()
entry := list.Element{Value: pluginInfo}
newState.Plugins[ptype][pname].PushFront(entry)
}
}
return dynamicBkt.Put(registryStateKey, newState)
}
121 changes: 92 additions & 29 deletions client/state/upgrade_int_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package state_test

import (
"bytes"
"compress/gzip"
"fmt"
"io"
Expand All @@ -10,6 +11,7 @@ import (
"strings"
"testing"

"github.com/boltdb/bolt"
"github.com/hashicorp/nomad/client/allocrunner"
"github.com/hashicorp/nomad/client/allocwatcher"
clientconfig "github.com/hashicorp/nomad/client/config"
Expand All @@ -32,40 +34,49 @@ import (
func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) {
t.Parallel()

files, err := filepath.Glob("testdata/*.db*")
require.NoError(t, err)
dbFromTestFile := func(t *testing.T, dir, fn string) *BoltStateDB {

for _, fn := range files {
t.Run(fn, func(t *testing.T) {
dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(t, err)
defer os.RemoveAll(dir)
var src io.ReadCloser
src, err := os.Open(fn)
require.NoError(t, err)
defer src.Close()

var src io.ReadCloser
src, err = os.Open(fn)
// testdata may be gzip'd; decode on copy
if strings.HasSuffix(fn, ".gz") {
src, err = gzip.NewReader(src)
require.NoError(t, err)
defer src.Close()
}

// testdata may be gzip'd; decode on copy
if strings.HasSuffix(fn, ".gz") {
src, err = gzip.NewReader(src)
require.NoError(t, err)
}
dst, err := os.Create(filepath.Join(dir, "state.db"))
require.NoError(t, err)

dst, err := os.Create(filepath.Join(dir, "state.db"))
require.NoError(t, err)
// Copy test files before testing them for safety
_, err = io.Copy(dst, src)
require.NoError(t, err)

// Copy test files before testing them for safety
_, err = io.Copy(dst, src)
require.NoError(t, err)
require.NoError(t, src.Close())

dbI, err := NewBoltStateDB(testlog.HCLogger(t), dir)
require.NoError(t, err)

db := dbI.(*BoltStateDB)
return db
}

require.NoError(t, src.Close())
pre09files := []string{
"testdata/state-0.7.1.db.gz",
"testdata/state-0.8.6-empty.db.gz",
"testdata/state-0.8.6-no-deploy.db.gz"}

dbI, err := NewBoltStateDB(testlog.HCLogger(t), dir)
for _, fn := range pre09files {
t.Run(fn, func(t *testing.T) {

dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(t, err)
defer dbI.Close()
defer os.RemoveAll(dir)

db := dbI.(*BoltStateDB)
db := dbFromTestFile(t, dir, fn)
defer db.Close()

// Simply opening old files should *not* alter them
require.NoError(t, db.DB().View(func(tx *boltdd.Tx) error {
Expand All @@ -76,16 +87,18 @@ func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) {
return nil
}))

needsUpgrade, err := NeedsUpgrade(db.DB().BoltDB())
to09, to12, err := NeedsUpgrade(db.DB().BoltDB())
require.NoError(t, err)
require.True(t, needsUpgrade)
require.True(t, to09)
require.True(t, to12)

// Attept the upgrade
// Attempt the upgrade
require.NoError(t, db.Upgrade())

needsUpgrade, err = NeedsUpgrade(db.DB().BoltDB())
to09, to12, err = NeedsUpgrade(db.DB().BoltDB())
require.NoError(t, err)
require.False(t, needsUpgrade)
require.False(t, to09)
require.False(t, to12)

// Ensure Allocations can be restored and
// NewAR/AR.Restore do not error.
Expand All @@ -109,9 +122,59 @@ func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) {
}
require.NoError(t, db.PutDevicePluginState(ps))

registry, err := db.GetDynamicPluginRegistryState()
require.Nil(t, registry)

require.NoError(t, err)
require.NoError(t, db.Close())
})
}

t.Run("testdata/state-1.2.6.db.gz", func(t *testing.T) {
fn := "testdata/state-1.2.6.db.gz"
dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(t, err)
defer os.RemoveAll(dir)

db := dbFromTestFile(t, dir, fn)
defer db.Close()

// Simply opening old files should *not* alter them
db.DB().BoltDB().View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("meta"))
if b == nil {
return fmt.Errorf("meta bucket should exist")
}
v := b.Get([]byte("version"))
if len(v) == 0 {
return fmt.Errorf("meta version is missing")
}
if !bytes.Equal(v, []byte{'2'}) {
return fmt.Errorf("meta version should not have changed")
}
return nil
})

to09, to12, err := NeedsUpgrade(db.DB().BoltDB())
require.NoError(t, err)
require.False(t, to09)
require.True(t, to12)

// Attempt the upgrade
require.NoError(t, db.Upgrade())

to09, to12, err = NeedsUpgrade(db.DB().BoltDB())
require.NoError(t, err)
require.False(t, to09)
require.False(t, to12)

registry, err := db.GetDynamicPluginRegistryState()
require.NoError(t, err)
require.NotNil(t, registry)
require.Len(t, registry.Plugins["csi-node"], 2)

require.NoError(t, db.Close())
})
}

// checkUpgradedAlloc creates and restores an AllocRunner from an upgraded
Expand Down
Loading

0 comments on commit 0cfe94e

Please sign in to comment.