Skip to content

Commit

Permalink
back out blocking query in lieu of limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Nov 16, 2022
1 parent b3f8ff5 commit 01dc833
Showing 1 changed file with 10 additions and 39 deletions.
49 changes: 10 additions & 39 deletions nomad/encrypter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
log "github.com/hashicorp/go-hclog"
kms "github.com/hashicorp/go-kms-wrapping/v2"
"github.com/hashicorp/go-kms-wrapping/v2/aead"
memdb "github.com/hashicorp/go-memdb"
"golang.org/x/time/rate"

"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/crypto"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -432,46 +431,32 @@ func (krr *KeyringReplicator) stop() {
krr.stopFn()
}

const keyringReplicationRate = 10
const keyringReplicationRate = 5

func (krr *KeyringReplicator) run(ctx context.Context) {
krr.logger.Debug("starting encryption key replication")
defer krr.logger.Debug("exiting key replication")

store := krr.srv.fsm.State()
abandonCh := store.AbandonCh()
minIndex := uint64(0)
limiter := rate.NewLimiter(keyringReplicationRate, keyringReplicationRate)

for {
select {
case <-krr.srv.shutdownCtx.Done():
return
case <-ctx.Done():
return
case <-abandonCh:
// reset the blocking query to make sure we get all the keys from a
// restored snapshot
store = krr.srv.fsm.State()
abandonCh = store.AbandonCh()
minIndex = uint64(0)
default:
// ensure that we abandon the blocking query periodically to give
// other contexts a chance to fire
queryCtx, queryCancel := context.WithTimeout(ctx, time.Second)
defer queryCancel()

var rawIter any
var err error
rawIter, minIndex, err = store.BlockingQuery(getRootKeyMetas, minIndex, queryCtx)
err := limiter.Wait(ctx)
if err != nil {
if queryCtx.Err() == nil {
// we get errors for closed context but don't want to log on that
krr.logger.Error("failed to fetch keyring", "error", err)
}
continue
}
iter := rawIter.(memdb.ResultIterator)

store := krr.srv.fsm.State()
iter, err := store.RootKeyMetas(nil)
if err != nil {
krr.logger.Error("failed to fetch keyring", "error", err)
continue
}
for {
raw := iter.Next()
if raw == nil {
Expand All @@ -498,20 +483,6 @@ func (krr *KeyringReplicator) run(ctx context.Context) {

}

// getRootKeyMetas implements state.QueryFn and is run as a blocking query to
// detect new key metadata
func getRootKeyMetas(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) {
iter, err := store.RootKeyMetas(ws)
if err != nil {
return nil, 0, err
}
index, err := store.Index(state.TableRootKeyMeta)
if err != nil {
return nil, 0, err
}
return iter, index, nil
}

// replicateKey replicates a single key from peer servers that was present in
// the state store but missing from the keyring. Returns an error only if no
// peers have this key.
Expand Down

0 comments on commit 01dc833

Please sign in to comment.