Skip to content

Commit

Permalink
Implement 'batch mode' for persisting allocations on the client.
Browse files Browse the repository at this point in the history
Fixes #9047, see problem details there.

As a solution, we use BoltDB's 'Batch' mode that combines multiple
parallel writes into small number of transactions. See
https://github.com/boltdb/bolt#batch-read-write-transactions for
more information.
  • Loading branch information
ashtuchkin committed Oct 14, 2020
1 parent 71a022a commit 657a7d9
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 42 deletions.
15 changes: 8 additions & 7 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ func (ar *allocRunner) destroyImpl() {

// Cleanup state db; while holding the lock to avoid
// a race periodic PersistState that may resurrect the alloc
if err := ar.stateDB.DeleteAllocationBucket(ar.id); err != nil {
if err := ar.stateDB.DeleteAllocationBucket(ar.id, false); err != nil {
ar.logger.Warn("failed to delete allocation state", "error", err)
}

Expand All @@ -877,36 +877,37 @@ func (ar *allocRunner) destroyImpl() {
ar.destroyedLock.Unlock()
}

func (ar *allocRunner) PersistState() error {
func (ar *allocRunner) PersistState(inBatch bool) error {
ar.destroyedLock.Lock()
defer ar.destroyedLock.Unlock()

if ar.destroyed {
err := ar.stateDB.DeleteAllocationBucket(ar.id)
err := ar.stateDB.DeleteAllocationBucket(ar.id, inBatch)
if err != nil {
ar.logger.Warn("failed to delete allocation bucket", "error", err)
}
return nil
}

// persist network status, wrapping in a func to release state lock as early as possible
if err := func() error {
persistNetworkStatus := func() error {
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
if ar.state.NetworkStatus != nil {
if err := ar.stateDB.PutNetworkStatus(ar.id, ar.state.NetworkStatus); err != nil {
if err := ar.stateDB.PutNetworkStatus(ar.id, ar.state.NetworkStatus, inBatch); err != nil {
return err
}
}
return nil
}(); err != nil {
}
if err := persistNetworkStatus(); err != nil {
return err
}

// TODO: consider persisting deployment state along with task status.
// While we study why only the alloc is persisted, I opted to maintain current
// behavior and not risk adding yet more IO calls unnecessarily.
return ar.stateDB.PutAllocation(ar.Alloc())
return ar.stateDB.PutAllocation(ar.Alloc(), inBatch)
}

// Destroy the alloc runner by stopping it if it is still running and cleaning
Expand Down
4 changes: 2 additions & 2 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,7 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) {
}

// test final persisted state upon completion
require.NoError(t, ar.PersistState())
require.NoError(t, ar.PersistState(false))
allocs, _, err := conf.StateDB.GetAllAllocations()
require.NoError(t, err)
require.Len(t, allocs, 1)
Expand All @@ -1477,7 +1477,7 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) {
require.Nil(t, ts)

// check that DB alloc is empty after persisting state of destroyed AR
ar.PersistState()
require.NoError(t, ar.PersistState(false))
allocs, _, err = conf.StateDB.GetAllAllocations()
require.NoError(t, err)
require.Empty(t, allocs)
Expand Down
8 changes: 4 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type AllocRunner interface {
ShutdownCh() <-chan struct{}
Signal(taskName, signal string) error
GetTaskEventHandler(taskName string) drivermanager.EventHandler
PersistState() error
PersistState(inBatch bool) error

RestartTask(taskName string, taskEvent *structs.TaskEvent) error
RestartAll(taskEvent *structs.TaskEvent) error
Expand Down Expand Up @@ -1221,7 +1221,7 @@ func (c *Client) saveState() error {

for id, ar := range runners {
go func(id string, ar AllocRunner) {
err := ar.PersistState()
err := ar.PersistState(true) // Persist in 'batch mode', to minimize # of transactions.
if err != nil {
c.logger.Error("error saving alloc state", "error", err, "alloc_id", id)
l.Lock()
Expand Down Expand Up @@ -2333,7 +2333,7 @@ func (c *Client) updateAlloc(update *structs.Allocation) {
}

// Update local copy of alloc
if err := c.stateDB.PutAllocation(update); err != nil {
if err := c.stateDB.PutAllocation(update, false); err != nil {
c.logger.Error("error persisting updated alloc locally", "error", err, "alloc_id", update.ID)
}

Expand All @@ -2354,7 +2354,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error

// Initialize local copy of alloc before creating the alloc runner so
// we can't end up with an alloc runner that does not have an alloc.
if err := c.stateDB.PutAllocation(alloc); err != nil {
if err := c.stateDB.PutAllocation(alloc, false); err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,7 +1591,7 @@ func TestClient_hasLocalState(t *testing.T) {

t.Run("plain alloc", func(t *testing.T) {
alloc := mock.BatchAlloc()
c.stateDB.PutAllocation(alloc)
c.stateDB.PutAllocation(alloc, false)

require.False(t, c.hasLocalState(alloc))
})
Expand All @@ -1601,7 +1601,7 @@ func TestClient_hasLocalState(t *testing.T) {
taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name
ls := &trstate.LocalState{}

c.stateDB.PutAllocation(alloc)
c.stateDB.PutAllocation(alloc, false)
c.stateDB.PutTaskRunnerLocalState(alloc.ID, taskName, ls)

require.True(t, c.hasLocalState(alloc))
Expand All @@ -1614,7 +1614,7 @@ func TestClient_hasLocalState(t *testing.T) {
State: structs.TaskStateRunning,
}

c.stateDB.PutAllocation(alloc)
c.stateDB.PutAllocation(alloc, false)
c.stateDB.PutTaskState(alloc.ID, taskName, ts)

require.True(t, c.hasLocalState(alloc))
Expand Down
109 changes: 103 additions & 6 deletions client/state/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io/ioutil"
"os"
"reflect"
"sync"
"testing"

trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
Expand Down Expand Up @@ -57,7 +58,7 @@ func testDB(t *testing.T, f func(*testing.T, StateDB)) {
}
}

// TestStateDB asserts the behavior of GetAllAllocations, PutAllocation, and
// TestStateDB_Allocations asserts the behavior of GetAllAllocations, PutAllocation, and
// DeleteAllocationBucket for all operational StateDB implementations.
func TestStateDB_Allocations(t *testing.T) {
t.Parallel()
Expand All @@ -77,8 +78,8 @@ func TestStateDB_Allocations(t *testing.T) {
alloc1 := mock.Alloc()
alloc2 := mock.BatchAlloc()

require.NoError(db.PutAllocation(alloc1))
require.NoError(db.PutAllocation(alloc2))
require.NoError(db.PutAllocation(alloc1, false))
require.NoError(db.PutAllocation(alloc2, false))

// Retrieve them
allocs, errs, err = db.GetAllAllocations()
Expand Down Expand Up @@ -106,7 +107,7 @@ func TestStateDB_Allocations(t *testing.T) {

// Add another
alloc3 := mock.SystemAlloc()
require.NoError(db.PutAllocation(alloc3))
require.NoError(db.PutAllocation(alloc3, false))
allocs, errs, err = db.GetAllAllocations()
require.NoError(err)
require.NotNil(allocs)
Expand All @@ -118,14 +119,14 @@ func TestStateDB_Allocations(t *testing.T) {
require.Empty(errs)

// Deleting a nonexistent alloc is a noop
require.NoError(db.DeleteAllocationBucket("asdf"))
require.NoError(db.DeleteAllocationBucket("asdf", false))
allocs, _, err = db.GetAllAllocations()
require.NoError(err)
require.NotNil(allocs)
require.Len(allocs, 3)

// Delete alloc1
require.NoError(db.DeleteAllocationBucket(alloc1.ID))
require.NoError(db.DeleteAllocationBucket(alloc1.ID, false))
allocs, errs, err = db.GetAllAllocations()
require.NoError(err)
require.NotNil(allocs)
Expand All @@ -137,6 +138,102 @@ func TestStateDB_Allocations(t *testing.T) {
})
}

// TestStateDB_Batch asserts the behavior of PutAllocation, PutNetworkStatus and
// DeleteAllocationBucket in batch mode, for all operational StateDB implementations.
func TestStateDB_Batch(t *testing.T) {
t.Parallel()

testDB(t, func(t *testing.T, db StateDB) {
require := require.New(t)

// For BoltDB, get initial tx_id
var getTxID func() int
var prevTxID int
if boltStateDB, ok := db.(*BoltStateDB); ok {
boltdb := boltStateDB.DB().BoltDB()
getTxID = func() int {
tx, err := boltdb.Begin(true)
require.NoError(err)
defer tx.Rollback()
return tx.ID()
}
prevTxID = getTxID()
}

// Write 1000 allocations and network statuses in batch mode
var allocs []*structs.Allocation
for i := 0; i < 1000; i++ {
allocs = append(allocs, mock.Alloc())
}
var wg sync.WaitGroup
for _, alloc := range allocs {
wg.Add(1)
go func(alloc *structs.Allocation) {
require.NoError(db.PutNetworkStatus(alloc.ID, mock.AllocNetworkStatus(), true))
require.NoError(db.PutAllocation(alloc, true))
wg.Done()
}(alloc)
}
wg.Wait()

// Check BoltDB actually combined PutAllocation calls into much fewer transactions.
// The actual number of transactions depends on how fast the goroutines are spawned,
// with every 10ms period saved in a separate transaction (see boltdb MaxBatchDelay
// and MaxBatchSize parameters).
if getTxID != nil {
numTransactions := getTxID() - prevTxID
require.Less(numTransactions, 10)
prevTxID = getTxID()
}

// Retrieve allocs and make sure they are the same (order can differ)
readAllocs, errs, err := db.GetAllAllocations()
require.NoError(err)
require.NotNil(readAllocs)
require.Len(readAllocs, len(allocs))
require.NotNil(errs)
require.Empty(errs)

readAllocsById := make(map[string]*structs.Allocation)
for _, readAlloc := range readAllocs {
readAllocsById[readAlloc.ID] = readAlloc
}
for _, alloc := range allocs {
readAlloc, ok := readAllocsById[alloc.ID]
if !ok {
t.Fatalf("no alloc with ID=%q", alloc.ID)
}
if !reflect.DeepEqual(readAlloc, alloc) {
pretty.Ldiff(t, readAlloc, alloc)
t.Fatalf("alloc %q unequal", alloc.ID)
}
}

// Delete all allocs in batch mode
for _, alloc := range allocs {
wg.Add(1)
go func(alloc *structs.Allocation) {
require.NoError(db.DeleteAllocationBucket(alloc.ID, true))
wg.Done()
}(alloc)
}
wg.Wait()

// Check BoltDB combined DeleteAllocationBucket calls into much fewer transactions.
if getTxID != nil {
numTransactions := getTxID() - prevTxID
require.Less(numTransactions, 10)
prevTxID = getTxID()
}

// Check all allocs were deleted.
readAllocs, errs, err = db.GetAllAllocations()
require.NoError(err)
require.Empty(readAllocs)
require.Empty(errs)
})
}

// TestStateDB_TaskState asserts the behavior of task state related StateDB
// methods.
func TestStateDB_TaskState(t *testing.T) {
Expand Down
18 changes: 15 additions & 3 deletions client/state/errdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -27,7 +28,7 @@ func (m *ErrDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er
return m.Allocs, nil, nil
}

func (m *ErrDB) PutAllocation(alloc *structs.Allocation) error {
func (m *ErrDB) PutAllocation(alloc *structs.Allocation, inBatch bool) error {
return fmt.Errorf("Error!")
}

Expand All @@ -43,7 +44,7 @@ func (m *ErrDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e
return nil, fmt.Errorf("Error!")
}

func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error {
func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, inBatch bool) error {
return fmt.Errorf("Error!")
}

Expand All @@ -63,14 +64,22 @@ func (m *ErrDB) DeleteTaskBucket(allocID, taskName string) error {
return fmt.Errorf("Error!")
}

func (m *ErrDB) DeleteAllocationBucket(allocID string) error {
func (m *ErrDB) DeleteAllocationBucket(allocID string, inBatch bool) error {
return fmt.Errorf("Error!")
}

func (m *ErrDB) PutDevicePluginState(ps *dmstate.PluginState) error {
return fmt.Errorf("Error!")
}

func (m *ErrDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) {
return nil, fmt.Errorf("Error!")
}

func (m *ErrDB) PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error {
return fmt.Errorf("Error!")
}

// GetDevicePluginState stores the device manager's plugin state or returns an
// error.
func (m *ErrDB) GetDevicePluginState() (*dmstate.PluginState, error) {
Expand All @@ -88,3 +97,6 @@ func (m *ErrDB) PutDriverPluginState(ps *driverstate.PluginState) error {
func (m *ErrDB) Close() error {
return fmt.Errorf("Error!")
}

// Ensure *ErrDB implements StateDB
var _ StateDB = (*ErrDB)(nil)
11 changes: 6 additions & 5 deletions client/state/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ type StateDB interface {
// If a single error is returned then both allocations and the map will be nil.
GetAllAllocations() ([]*structs.Allocation, map[string]error, error)

// PulAllocation stores an allocation or returns an error if it could
// not be stored.
PutAllocation(*structs.Allocation) error
// PutAllocation stores an allocation or returns an error if it could
// not be stored. inBatch parameter enables performance optimization when
// multiple allocations are expected to be saved in parallel.
PutAllocation(alloc *structs.Allocation, inBatch bool) error

// Get/Put DeploymentStatus get and put the allocation's deployment
// status. It may be nil.
Expand All @@ -36,7 +37,7 @@ type StateDB interface {
// Get/Put NetworkStatus get and put the allocation's network
// status. It may be nil.
GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error)
PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error
PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, inBatch bool) error

// GetTaskRunnerState returns the LocalState and TaskState for a
// TaskRunner. Either state may be nil if it is not found, but if an
Expand All @@ -57,7 +58,7 @@ type StateDB interface {

// DeleteAllocationBucket deletes an allocation's state bucket if it
// exists. No error is returned if it does not exist.
DeleteAllocationBucket(allocID string) error
DeleteAllocationBucket(allocID string, inBatch bool) error

// GetDevicePluginState is used to retrieve the device manager's plugin
// state.
Expand Down
Loading

0 comments on commit 657a7d9

Please sign in to comment.