Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fsm: one-time token expiration should be deterministic #13737

Merged
merged 2 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/13737.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
acl: Fixed a bug where the timestamp for expiring one-time tokens was not deterministic between servers
```
2 changes: 2 additions & 0 deletions nomad/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,8 @@ func (a *ACL) ExpireOneTimeTokens(args *structs.OneTimeTokenExpireRequest, reply
}
}

args.Timestamp = time.Now() // use the leader's timestamp

// Expire token via raft; because this is the only write in the RPC the
// caller can safely retry with the same token if the raft write fails
_, index, err := a.srv.raftApply(structs.OneTimeTokenExpireRequestType, args)
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ func (n *nomadFSM) applyOneTimeTokenExpire(msgType structs.MessageType, buf []by
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.ExpireOneTimeTokens(msgType, index); err != nil {
if err := n.state.ExpireOneTimeTokens(msgType, index, req.Timestamp); err != nil {
n.logger.Error("ExpireOneTimeTokens failed", "error", err)
return err
}
Expand Down
8 changes: 4 additions & 4 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5852,11 +5852,11 @@ func (s *StateStore) DeleteOneTimeTokens(msgType structs.MessageType, index uint
}

// ExpireOneTimeTokens deletes tokens that have expired
func (s *StateStore) ExpireOneTimeTokens(msgType structs.MessageType, index uint64) error {
func (s *StateStore) ExpireOneTimeTokens(msgType structs.MessageType, index uint64, timestamp time.Time) error {
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()

iter, err := s.oneTimeTokensExpiredTxn(txn, nil)
iter, err := s.oneTimeTokensExpiredTxn(txn, nil, timestamp)
if err != nil {
return err
}
Expand Down Expand Up @@ -5887,14 +5887,14 @@ func (s *StateStore) ExpireOneTimeTokens(msgType structs.MessageType, index uint
}

// oneTimeTokensExpiredTxn returns an iterator over all expired one-time tokens
func (s *StateStore) oneTimeTokensExpiredTxn(txn *txn, ws memdb.WatchSet) (memdb.ResultIterator, error) {
func (s *StateStore) oneTimeTokensExpiredTxn(txn *txn, ws memdb.WatchSet, timestamp time.Time) (memdb.ResultIterator, error) {
iter, err := txn.Get("one_time_token", "id")
if err != nil {
return nil, fmt.Errorf("one-time token lookup failed: %v", err)
}

ws.Add(iter.WatchCh())
iter = memdb.NewFilterIterator(iter, expiredOneTimeTokenFilter(time.Now()))
iter = memdb.NewFilterIterator(iter, expiredOneTimeTokenFilter(timestamp))
return iter, nil
}

Expand Down
12 changes: 6 additions & 6 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8912,9 +8912,9 @@ func TestStateStore_OneTimeTokens(t *testing.T) {

// now verify expiration

getExpiredTokens := func() []*structs.OneTimeToken {
getExpiredTokens := func(now time.Time) []*structs.OneTimeToken {
txn := state.db.ReadTxn()
iter, err := state.oneTimeTokensExpiredTxn(txn, nil)
iter, err := state.oneTimeTokensExpiredTxn(txn, nil, now)
require.NoError(t, err)

results := []*structs.OneTimeToken{}
Expand All @@ -8930,7 +8930,7 @@ func TestStateStore_OneTimeTokens(t *testing.T) {
return results
}

results = getExpiredTokens()
results = getExpiredTokens(time.Now())
require.Len(t, results, 2)

// results aren't ordered
Expand All @@ -8942,10 +8942,10 @@ func TestStateStore_OneTimeTokens(t *testing.T) {

// clear the expired tokens and verify they're gone
index++
require.NoError(t,
state.ExpireOneTimeTokens(structs.MsgTypeTestSetup, index))
require.NoError(t, state.ExpireOneTimeTokens(
structs.MsgTypeTestSetup, index, time.Now()))

results = getExpiredTokens()
results = getExpiredTokens(time.Now())
require.Len(t, results, 0)

// query the unexpired token
Expand Down
1 change: 1 addition & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12077,6 +12077,7 @@ type OneTimeTokenDeleteRequest struct {

// OneTimeTokenExpireRequest is a request to delete all expired one-time tokens
type OneTimeTokenExpireRequest struct {
Timestamp time.Time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this need to get set in CoreScheduler.expiredOneTimeTokenGC? I don't see where this gets set, and core_sched.go is where the other GC operations compute their threshold based on the timetable/local clock before submitting the deletions via raft.

Copy link
Member Author

@tgross tgross Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be set on the ACL.ExpireOneTimeTokens RPC, which I missed b/c the tests passed.

Most of the core jobs use the timetable block specifically because they don't submit the deletions via raft, but via the RPC. While the leader creates the GC eval, it can be processed on any worker (ref worker.go#L565-L569).

So when I wrote the expire OTT job originally, I hit the same "quiet clusters will have wonky timestamps" problem @jrasell discovered (and I wanted to make transaction logic simple). So I wanted to use the wall clock on the leader to get a more accurate expiration. But I messed that up by not having the timestamp set in the leader's RPC, but in the FSM instead. So we should be setting that on the leader's handler for the RPC and we'll be all set.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 9d8138c

WriteRequest
}

Expand Down