diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 7aa3a487eb1f..df4e17638e88 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -862,7 +862,7 @@ func (ar *allocRunner) destroyImpl() { // Cleanup state db; while holding the lock to avoid // a race periodic PersistState that may resurrect the alloc - if err := ar.stateDB.DeleteAllocationBucket(ar.id); err != nil { + if err := ar.stateDB.DeleteAllocationBucket(ar.id, false); err != nil { ar.logger.Warn("failed to delete allocation state", "error", err) } @@ -877,12 +877,12 @@ func (ar *allocRunner) destroyImpl() { ar.destroyedLock.Unlock() } -func (ar *allocRunner) PersistState() error { +func (ar *allocRunner) PersistState(inBatch bool) error { ar.destroyedLock.Lock() defer ar.destroyedLock.Unlock() if ar.destroyed { - err := ar.stateDB.DeleteAllocationBucket(ar.id) + err := ar.stateDB.DeleteAllocationBucket(ar.id, inBatch) if err != nil { ar.logger.Warn("failed to delete allocation bucket", "error", err) } @@ -890,23 +890,24 @@ func (ar *allocRunner) PersistState() error { } // persist network status, wrapping in a func to release state lock as early as possible - if err := func() error { + persistNetworkStatus := func() error { ar.stateLock.Lock() defer ar.stateLock.Unlock() if ar.state.NetworkStatus != nil { - if err := ar.stateDB.PutNetworkStatus(ar.id, ar.state.NetworkStatus); err != nil { + if err := ar.stateDB.PutNetworkStatus(ar.id, ar.state.NetworkStatus, inBatch); err != nil { return err } } return nil - }(); err != nil { + } + if err := persistNetworkStatus(); err != nil { return err } // TODO: consider persisting deployment state along with task status. // While we study why only the alloc is persisted, I opted to maintain current // behavior and not risk adding yet more IO calls unnecessarily. - return ar.stateDB.PutAllocation(ar.Alloc()) + return ar.stateDB.PutAllocation(ar.Alloc(), inBatch) } // Destroy the alloc runner by stopping it if it is still running and cleaning diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index b64990e0497d..6201a909ac0c 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1452,7 +1452,7 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) { } // test final persisted state upon completion - require.NoError(t, ar.PersistState()) + require.NoError(t, ar.PersistState(false)) allocs, _, err := conf.StateDB.GetAllAllocations() require.NoError(t, err) require.Len(t, allocs, 1) @@ -1477,7 +1477,7 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) { require.Nil(t, ts) // check that DB alloc is empty after persisting state of destroyed AR - ar.PersistState() + require.NoError(t, ar.PersistState(false)) allocs, _, err = conf.StateDB.GetAllAllocations() require.NoError(t, err) require.Empty(t, allocs) diff --git a/client/client.go b/client/client.go index c1bd1afcdb8b..26a9e093bf5e 100644 --- a/client/client.go +++ b/client/client.go @@ -148,7 +148,7 @@ type AllocRunner interface { ShutdownCh() <-chan struct{} Signal(taskName, signal string) error GetTaskEventHandler(taskName string) drivermanager.EventHandler - PersistState() error + PersistState(inBatch bool) error RestartTask(taskName string, taskEvent *structs.TaskEvent) error RestartAll(taskEvent *structs.TaskEvent) error @@ -1221,7 +1221,7 @@ func (c *Client) saveState() error { for id, ar := range runners { go func(id string, ar AllocRunner) { - err := ar.PersistState() + err := ar.PersistState(true) // Persist in 'batch mode', to minimize # of transactions. if err != nil { c.logger.Error("error saving alloc state", "error", err, "alloc_id", id) l.Lock() @@ -2333,7 +2333,7 @@ func (c *Client) updateAlloc(update *structs.Allocation) { } // Update local copy of alloc - if err := c.stateDB.PutAllocation(update); err != nil { + if err := c.stateDB.PutAllocation(update, false); err != nil { c.logger.Error("error persisting updated alloc locally", "error", err, "alloc_id", update.ID) } @@ -2354,7 +2354,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error // Initialize local copy of alloc before creating the alloc runner so // we can't end up with an alloc runner that does not have an alloc. - if err := c.stateDB.PutAllocation(alloc); err != nil { + if err := c.stateDB.PutAllocation(alloc, false); err != nil { return err } diff --git a/client/client_test.go b/client/client_test.go index 813738e04691..e9016a4a94c4 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1591,7 +1591,7 @@ func TestClient_hasLocalState(t *testing.T) { t.Run("plain alloc", func(t *testing.T) { alloc := mock.BatchAlloc() - c.stateDB.PutAllocation(alloc) + c.stateDB.PutAllocation(alloc, false) require.False(t, c.hasLocalState(alloc)) }) @@ -1601,7 +1601,7 @@ func TestClient_hasLocalState(t *testing.T) { taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name ls := &trstate.LocalState{} - c.stateDB.PutAllocation(alloc) + c.stateDB.PutAllocation(alloc, false) c.stateDB.PutTaskRunnerLocalState(alloc.ID, taskName, ls) require.True(t, c.hasLocalState(alloc)) @@ -1614,7 +1614,7 @@ func TestClient_hasLocalState(t *testing.T) { State: structs.TaskStateRunning, } - c.stateDB.PutAllocation(alloc) + c.stateDB.PutAllocation(alloc, false) c.stateDB.PutTaskState(alloc.ID, taskName, ts) require.True(t, c.hasLocalState(alloc)) diff --git a/client/state/db_test.go b/client/state/db_test.go index bb63507a296e..8fd152c9c352 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" "reflect" + "sync" "testing" trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" @@ -57,7 +58,7 @@ func testDB(t *testing.T, f func(*testing.T, StateDB)) { } } -// TestStateDB asserts the behavior of GetAllAllocations, PutAllocation, and +// TestStateDB_Allocations asserts the behavior of GetAllAllocations, PutAllocation, and // DeleteAllocationBucket for all operational StateDB implementations. func TestStateDB_Allocations(t *testing.T) { t.Parallel() @@ -77,8 +78,8 @@ func TestStateDB_Allocations(t *testing.T) { alloc1 := mock.Alloc() alloc2 := mock.BatchAlloc() - require.NoError(db.PutAllocation(alloc1)) - require.NoError(db.PutAllocation(alloc2)) + require.NoError(db.PutAllocation(alloc1, false)) + require.NoError(db.PutAllocation(alloc2, false)) // Retrieve them allocs, errs, err = db.GetAllAllocations() @@ -106,7 +107,7 @@ func TestStateDB_Allocations(t *testing.T) { // Add another alloc3 := mock.SystemAlloc() - require.NoError(db.PutAllocation(alloc3)) + require.NoError(db.PutAllocation(alloc3, false)) allocs, errs, err = db.GetAllAllocations() require.NoError(err) require.NotNil(allocs) @@ -118,14 +119,14 @@ func TestStateDB_Allocations(t *testing.T) { require.Empty(errs) // Deleting a nonexistent alloc is a noop - require.NoError(db.DeleteAllocationBucket("asdf")) + require.NoError(db.DeleteAllocationBucket("asdf", false)) allocs, _, err = db.GetAllAllocations() require.NoError(err) require.NotNil(allocs) require.Len(allocs, 3) // Delete alloc1 - require.NoError(db.DeleteAllocationBucket(alloc1.ID)) + require.NoError(db.DeleteAllocationBucket(alloc1.ID, false)) allocs, errs, err = db.GetAllAllocations() require.NoError(err) require.NotNil(allocs) @@ -137,6 +138,102 @@ func TestStateDB_Allocations(t *testing.T) { }) } +// TestStateDB_Batch asserts the behavior of PutAllocation, PutNetworkStatus and +// DeleteAllocationBucket in batch mode, for all operational StateDB implementations. +func TestStateDB_Batch(t *testing.T) { + t.Parallel() + + testDB(t, func(t *testing.T, db StateDB) { + require := require.New(t) + + // For BoltDB, get initial tx_id + var getTxID func() int + var prevTxID int + if boltStateDB, ok := db.(*BoltStateDB); ok { + boltdb := boltStateDB.DB().BoltDB() + getTxID = func() int { + tx, err := boltdb.Begin(true) + require.NoError(err) + defer tx.Rollback() + return tx.ID() + } + prevTxID = getTxID() + } + + // Write 1000 allocations and network statuses in batch mode + var allocs []*structs.Allocation + for i := 0; i < 1000; i++ { + allocs = append(allocs, mock.Alloc()) + } + var wg sync.WaitGroup + for _, alloc := range allocs { + wg.Add(1) + go func(alloc *structs.Allocation) { + require.NoError(db.PutNetworkStatus(alloc.ID, mock.AllocNetworkStatus(), true)) + require.NoError(db.PutAllocation(alloc, true)) + wg.Done() + }(alloc) + } + wg.Wait() + + // Check BoltDB actually combined PutAllocation calls into much fewer transactions. + // The actual number of transactions depends on how fast the goroutines are spawned, + // with every 10ms period saved in a separate transaction (see boltdb MaxBatchDelay + // and MaxBatchSize parameters). + if getTxID != nil { + numTransactions := getTxID() - prevTxID + require.Less(numTransactions, 10) + prevTxID = getTxID() + } + + // Retrieve allocs and make sure they are the same (order can differ) + readAllocs, errs, err := db.GetAllAllocations() + require.NoError(err) + require.NotNil(readAllocs) + require.Len(readAllocs, len(allocs)) + require.NotNil(errs) + require.Empty(errs) + + readAllocsById := make(map[string]*structs.Allocation) + for _, readAlloc := range readAllocs { + readAllocsById[readAlloc.ID] = readAlloc + } + for _, alloc := range allocs { + readAlloc, ok := readAllocsById[alloc.ID] + if !ok { + t.Fatalf("no alloc with ID=%q", alloc.ID) + } + if !reflect.DeepEqual(readAlloc, alloc) { + pretty.Ldiff(t, readAlloc, alloc) + t.Fatalf("alloc %q unequal", alloc.ID) + } + } + + // Delete all allocs in batch mode + for _, alloc := range allocs { + wg.Add(1) + go func(alloc *structs.Allocation) { + require.NoError(db.DeleteAllocationBucket(alloc.ID, true)) + wg.Done() + }(alloc) + } + wg.Wait() + + // Check BoltDB combined DeleteAllocationBucket calls into much fewer transactions. + if getTxID != nil { + numTransactions := getTxID() - prevTxID + require.Less(numTransactions, 10) + prevTxID = getTxID() + } + + // Check all allocs were deleted. + readAllocs, errs, err = db.GetAllAllocations() + require.NoError(err) + require.Empty(readAllocs) + require.Empty(errs) + }) +} + // TestStateDB_TaskState asserts the behavior of task state related StateDB // methods. func TestStateDB_TaskState(t *testing.T) { diff --git a/client/state/errdb.go b/client/state/errdb.go index 1e6270f994aa..6ae3726ec5a3 100644 --- a/client/state/errdb.go +++ b/client/state/errdb.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/dynamicplugins" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -27,7 +28,7 @@ func (m *ErrDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er return m.Allocs, nil, nil } -func (m *ErrDB) PutAllocation(alloc *structs.Allocation) error { +func (m *ErrDB) PutAllocation(alloc *structs.Allocation, inBatch bool) error { return fmt.Errorf("Error!") } @@ -43,7 +44,7 @@ func (m *ErrDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e return nil, fmt.Errorf("Error!") } -func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error { +func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, inBatch bool) error { return fmt.Errorf("Error!") } @@ -63,7 +64,7 @@ func (m *ErrDB) DeleteTaskBucket(allocID, taskName string) error { return fmt.Errorf("Error!") } -func (m *ErrDB) DeleteAllocationBucket(allocID string) error { +func (m *ErrDB) DeleteAllocationBucket(allocID string, inBatch bool) error { return fmt.Errorf("Error!") } @@ -71,6 +72,14 @@ func (m *ErrDB) PutDevicePluginState(ps *dmstate.PluginState) error { return fmt.Errorf("Error!") } +func (m *ErrDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) { + return nil, fmt.Errorf("Error!") +} + +func (m *ErrDB) PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error { + return fmt.Errorf("Error!") +} + // GetDevicePluginState stores the device manager's plugin state or returns an // error. func (m *ErrDB) GetDevicePluginState() (*dmstate.PluginState, error) { @@ -88,3 +97,6 @@ func (m *ErrDB) PutDriverPluginState(ps *driverstate.PluginState) error { func (m *ErrDB) Close() error { return fmt.Errorf("Error!") } + +// Ensure *ErrDB implements StateDB +var _ StateDB = (*ErrDB)(nil) \ No newline at end of file diff --git a/client/state/interface.go b/client/state/interface.go index b29ba5daff03..a830bf04910a 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -24,9 +24,10 @@ type StateDB interface { // If a single error is returned then both allocations and the map will be nil. GetAllAllocations() ([]*structs.Allocation, map[string]error, error) - // PulAllocation stores an allocation or returns an error if it could - // not be stored. - PutAllocation(*structs.Allocation) error + // PutAllocation stores an allocation or returns an error if it could + // not be stored. inBatch parameter enables performance optimization when + // multiple allocations are expected to be saved in parallel. + PutAllocation(alloc *structs.Allocation, inBatch bool) error // Get/Put DeploymentStatus get and put the allocation's deployment // status. It may be nil. @@ -36,7 +37,7 @@ type StateDB interface { // Get/Put NetworkStatus get and put the allocation's network // status. It may be nil. GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error) - PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error + PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, inBatch bool) error // GetTaskRunnerState returns the LocalState and TaskState for a // TaskRunner. Either state may be nil if it is not found, but if an @@ -57,7 +58,7 @@ type StateDB interface { // DeleteAllocationBucket deletes an allocation's state bucket if it // exists. No error is returned if it does not exist. - DeleteAllocationBucket(allocID string) error + DeleteAllocationBucket(allocID string, inBatch bool) error // GetDevicePluginState is used to retrieve the device manager's plugin // state. diff --git a/client/state/memdb.go b/client/state/memdb.go index b47994780170..f122d0cea971 100644 --- a/client/state/memdb.go +++ b/client/state/memdb.go @@ -73,7 +73,7 @@ func (m *MemDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er return allocs, map[string]error{}, nil } -func (m *MemDB) PutAllocation(alloc *structs.Allocation) error { +func (m *MemDB) PutAllocation(alloc *structs.Allocation, inBatch bool) error { m.mu.Lock() defer m.mu.Unlock() m.allocs[alloc.ID] = alloc @@ -99,7 +99,7 @@ func (m *MemDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e return m.networkStatus[allocID], nil } -func (m *MemDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error { +func (m *MemDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, inBatch bool) error { m.mu.Lock() m.networkStatus[allocID] = ns defer m.mu.Unlock() @@ -175,7 +175,7 @@ func (m *MemDB) DeleteTaskBucket(allocID, taskName string) error { return nil } -func (m *MemDB) DeleteAllocationBucket(allocID string) error { +func (m *MemDB) DeleteAllocationBucket(allocID string, inBatch bool) error { m.mu.Lock() defer m.mu.Unlock() diff --git a/client/state/noopdb.go b/client/state/noopdb.go index 2c35cc6693bc..6e3034039b5c 100644 --- a/client/state/noopdb.go +++ b/client/state/noopdb.go @@ -23,7 +23,7 @@ func (n NoopDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er return nil, nil, nil } -func (n NoopDB) PutAllocation(*structs.Allocation) error { +func (n NoopDB) PutAllocation(alloc *structs.Allocation, inBatch bool) error { return nil } @@ -39,7 +39,7 @@ func (n NoopDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e return nil, nil } -func (n NoopDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus) error { +func (n NoopDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus, inBatch bool) error { return nil } @@ -59,7 +59,7 @@ func (n NoopDB) DeleteTaskBucket(allocID, taskName string) error { return nil } -func (n NoopDB) DeleteAllocationBucket(allocID string) error { +func (n NoopDB) DeleteAllocationBucket(allocID string, inBatch bool) error { return nil } diff --git a/client/state/state_database.go b/client/state/state_database.go index 4c0ad4fe07fb..5ccb30d535a4 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -234,8 +234,8 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m } // PutAllocation stores an allocation or returns an error. -func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error { - return s.db.Update(func(tx *boltdd.Tx) error { +func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation, inBatch bool) error { + return s.updateOrBatch(inBatch, func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucketName) if err != nil { @@ -321,8 +321,8 @@ type networkStatusEntry struct { // PutDeploymentStatus stores an allocation's DeploymentStatus or returns an // error. -func (s *BoltStateDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus) error { - return s.db.Update(func(tx *boltdd.Tx) error { +func (s *BoltStateDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus, inBatch bool) error { + return s.updateOrBatch(inBatch, func(tx *boltdd.Tx) error { return putNetworkStatusImpl(tx, allocID, ds) }) } @@ -493,8 +493,8 @@ func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error { } // DeleteAllocationBucket is used to delete an allocation bucket if it exists. -func (s *BoltStateDB) DeleteAllocationBucket(allocID string) error { - return s.db.Update(func(tx *boltdd.Tx) error { +func (s *BoltStateDB) DeleteAllocationBucket(allocID string, inBatch bool) error { + return s.updateOrBatch(inBatch, func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket allocations := tx.Bucket(allocationsBucketName) if allocations == nil { @@ -725,6 +725,17 @@ func (s *BoltStateDB) init() error { }) } +// Some write operations in BoltStateDB support "batch mode". In this mode, BoltDB +// opportunistically combines multiple concurrent updates into one or several transactions. +// See boltdb.Batch() documentation for details. +func (s *BoltStateDB) updateOrBatch(inBatch bool, updateFn func(tx *boltdd.Tx) error) error { + if inBatch { + return s.db.Batch(updateFn) + } else { + return s.db.Update(updateFn) + } +} + // Upgrade bolt state db from 0.8 schema to 0.9 schema. Noop if already using // 0.9 schema. Creates a backup before upgrading. func (s *BoltStateDB) Upgrade() error { diff --git a/helper/boltdd/boltdd.go b/helper/boltdd/boltdd.go index 2c221c10a8fc..ea337047de7f 100644 --- a/helper/boltdd/boltdd.go +++ b/helper/boltdd/boltdd.go @@ -141,6 +141,13 @@ func (db *DB) Update(fn func(*Tx) error) error { }) } +func (db *DB) Batch(fn func(*Tx) error) error { + return db.bdb.Batch(func(btx *bolt.Tx) error { + tx := newTx(db, btx) + return fn(tx) + }) +} + func (db *DB) View(fn func(*Tx) error) error { return db.bdb.View(func(btx *bolt.Tx) error { tx := newTx(db, btx) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 765ca0730fae..a2bda9e57255 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1493,3 +1493,15 @@ func Events(index uint64) *structs.Events { }, } } + +func AllocNetworkStatus() *structs.AllocNetworkStatus { + return &structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "192.168.0.100", + DNS: &structs.DNSConfig{ + Servers: []string{"1.1.1.1"}, + Searches: []string{"localdomain"}, + Options: []string{"ndots:5"}, + }, + } +}