Skip to content

Commit

Permalink
Cherry picks for v2.10.17-RC.8 (#5590)
Browse files Browse the repository at this point in the history
Includes:

- #5584
- #5585
- #5586
- #5587
- #5589
  • Loading branch information
wallyqs authored Jun 25, 2024
2 parents 6189eae + e019d7a commit 7238011
Show file tree
Hide file tree
Showing 15 changed files with 657 additions and 59 deletions.
66 changes: 37 additions & 29 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2822,9 +2822,12 @@ func (a *Account) isIssuerClaimTrusted(claims *jwt.ActivationClaims) bool {
// check is done with the account's name, not the pointer. This is used
// during config reload where we are comparing current and new config
// in which pointers are different.
// No lock is acquired in this function, so it is assumed that the
// import maps are not changed while this executes.
// Acquires `a` read lock, but `b` is assumed to not be accessed
// by anyone but the caller (`b` is not registered anywhere).
func (a *Account) checkStreamImportsEqual(b *Account) bool {
a.mu.RLock()
defer a.mu.RUnlock()

if len(a.imports.streams) != len(b.imports.streams) {
return false
}
Expand Down Expand Up @@ -3192,6 +3195,9 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
a.nameTag = ac.Name
a.tags = ac.Tags

// Grab trace label under lock.
tl := a.traceLabel()

// Check for external authorization.
if ac.HasExternalAuthorization() {
a.extAuth = &jwt.ExternalAuthorization{}
Expand All @@ -3212,10 +3218,10 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
}
if a.imports.services != nil {
old.imports.services = make(map[string]*serviceImport, len(a.imports.services))
}
for k, v := range a.imports.services {
old.imports.services[k] = v
delete(a.imports.services, k)
for k, v := range a.imports.services {
old.imports.services[k] = v
delete(a.imports.services, k)
}
}

alteredScope := map[string]struct{}{}
Expand Down Expand Up @@ -3285,13 +3291,13 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
for _, e := range ac.Exports {
switch e.Type {
case jwt.Stream:
s.Debugf("Adding stream export %q for %s", e.Subject, a.traceLabel())
s.Debugf("Adding stream export %q for %s", e.Subject, tl)
if err := a.addStreamExportWithAccountPos(
string(e.Subject), authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil {
s.Debugf("Error adding stream export to account [%s]: %v", a.traceLabel(), err.Error())
s.Debugf("Error adding stream export to account [%s]: %v", tl, err.Error())
}
case jwt.Service:
s.Debugf("Adding service export %q for %s", e.Subject, a.traceLabel())
s.Debugf("Adding service export %q for %s", e.Subject, tl)
rt := Singleton
switch e.ResponseType {
case jwt.ResponseTypeStream:
Expand All @@ -3301,7 +3307,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
}
if err := a.addServiceExportWithResponseAndAccountPos(
string(e.Subject), rt, authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil {
s.Debugf("Error adding service export to account [%s]: %v", a.traceLabel(), err)
s.Debugf("Error adding service export to account [%s]: %v", tl, err)
continue
}
sub := string(e.Subject)
Expand All @@ -3311,13 +3317,13 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
if e.Latency.Sampling == jwt.Headers {
hdrNote = " (using headers)"
}
s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, a.traceLabel(), err)
s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, tl, err)
}
}
if e.ResponseThreshold != 0 {
// Response threshold was set in options.
if err := a.SetServiceExportResponseThreshold(sub, e.ResponseThreshold); err != nil {
s.Debugf("Error adding service export response threshold for [%s]: %v", a.traceLabel(), err)
s.Debugf("Error adding service export response threshold for [%s]: %v", tl, err)
}
}
}
Expand Down Expand Up @@ -3362,44 +3368,41 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
}
var incompleteImports []*jwt.Import
for _, i := range ac.Imports {
// check tmpAccounts with priority
var acc *Account
var err error
if v, ok := s.tmpAccounts.Load(i.Account); ok {
acc = v.(*Account)
} else {
acc, err = s.lookupAccount(i.Account)
}
acc, err := s.lookupAccount(i.Account)
if acc == nil || err != nil {
s.Errorf("Can't locate account [%s] for import of [%v] %s (err=%v)", i.Account, i.Subject, i.Type, err)
incompleteImports = append(incompleteImports, i)
continue
}
from := string(i.Subject)
to := i.GetTo()
// Capture trace labels.
acc.mu.RLock()
atl := acc.traceLabel()
acc.mu.RUnlock()
// Grab from and to
from, to := string(i.Subject), i.GetTo()
switch i.Type {
case jwt.Stream:
if i.LocalSubject != _EMPTY_ {
// set local subject implies to is empty
to = string(i.LocalSubject)
s.Debugf("Adding stream import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
s.Debugf("Adding stream import %s:%q for %s:%q", atl, from, tl, to)
err = a.AddMappedStreamImportWithClaim(acc, from, to, i)
} else {
s.Debugf("Adding stream import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
s.Debugf("Adding stream import %s:%q for %s:%q", atl, from, tl, to)
err = a.AddStreamImportWithClaim(acc, from, to, i)
}
if err != nil {
s.Debugf("Error adding stream import to account [%s]: %v", a.traceLabel(), err.Error())
s.Debugf("Error adding stream import to account [%s]: %v", tl, err.Error())
incompleteImports = append(incompleteImports, i)
}
case jwt.Service:
if i.LocalSubject != _EMPTY_ {
from = string(i.LocalSubject)
to = string(i.Subject)
}
s.Debugf("Adding service import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
s.Debugf("Adding service import %s:%q for %s:%q", atl, from, tl, to)
if err := a.AddServiceImportWithClaim(acc, from, to, i); err != nil {
s.Debugf("Error adding service import to account [%s]: %v", a.traceLabel(), err.Error())
s.Debugf("Error adding service import to account [%s]: %v", tl, err.Error())
incompleteImports = append(incompleteImports, i)
}
}
Expand Down Expand Up @@ -3570,7 +3573,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
// regardless of enabled or disabled. It handles both cases.
if jsEnabled {
if err := s.configJetStream(a); err != nil {
s.Errorf("Error configuring jetstream for account [%s]: %v", a.traceLabel(), err.Error())
s.Errorf("Error configuring jetstream for account [%s]: %v", tl, err.Error())
a.mu.Lock()
// Absent reload of js server cfg, this is going to be broken until js is disabled
a.incomplete = true
Expand Down Expand Up @@ -3707,8 +3710,13 @@ func (s *Server) buildInternalAccount(ac *jwt.AccountClaims) *Account {
// We don't want to register an account that is in the process of
// being built, however, to solve circular import dependencies, we
// need to store it here.
s.tmpAccounts.Store(ac.Subject, acc)
if v, loaded := s.tmpAccounts.LoadOrStore(ac.Subject, acc); loaded {
return v.(*Account)
}

// Update based on claims.
s.UpdateAccountClaims(acc, ac)

return acc
}

Expand Down
5 changes: 4 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2914,8 +2914,11 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription, enact b

// Add in the shadow subscription.
func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscription, error) {
im := ime.im
c.mu.Lock()
nsub := *sub // copy
c.mu.Unlock()

im := ime.im
nsub.im = im

if !im.usePub && ime.dyn && im.tr != nil {
Expand Down
30 changes: 20 additions & 10 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2596,28 +2596,38 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {

// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index.
// Will return -1 if no matches at all.
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) int {
start := uint32(math.MaxUint32)
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) {
start, stop := uint32(math.MaxUint32), uint32(0)
if wc {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
if psi.fblk < start {
start = psi.fblk
}
if psi.lblk > stop {
stop = psi.lblk
}
})
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
start = psi.fblk
start, stop = psi.fblk, psi.lblk
}
// Nothing found.
if start == uint32(math.MaxUint32) {
return -1
return -1, -1
}
// Here we need to translate this to index into fs.blks.
// Here we need to translate this to index into fs.blks properly.
mb := fs.bim[start]
if mb == nil {
return -1
return -1, -1
}
bi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
return bi
fi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))

mb = fs.bim[stop]
if mb == nil {
return -1, -1
}
li, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))

return fi, li
}

// Optimized way for getting all num pending matching a filter subject.
Expand Down Expand Up @@ -6362,9 +6372,9 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
nbi := fs.checkSkipFirstBlock(filter, wc)
nbi, lbi := fs.checkSkipFirstBlock(filter, wc)
// Nothing available.
if nbi < 0 {
if nbi < 0 || lbi <= bi {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
Expand Down
81 changes: 81 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7162,6 +7162,58 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
require_Equal(t, psi.lblk, 4)
}

func TestFileStoreLargeSparseMsgsDoNotLoadAfterLast(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 128},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("hello")
// Create 2 blocks with each, each block holds 2 msgs
for i := 0; i < 2; i++ {
fs.StoreMsg("foo.22.bar", nil, msg)
fs.StoreMsg("foo.22.baz", nil, msg)
}
// Now create 8 more blocks with just baz. So no matches for these 8 blocks
// for "foo.22.bar".
for i := 0; i < 8; i++ {
fs.StoreMsg("foo.22.baz", nil, msg)
fs.StoreMsg("foo.22.baz", nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 10)

// Remove all blk cache and fss.
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.Lock()
mb.fss, mb.cache = nil, nil
mb.mu.Unlock()
}
fs.mu.RUnlock()

// "foo.22.bar" is at sequence 1 and 3.
// Make sure if we do a LoadNextMsg() starting at 4 that we do not load
// all the tail blocks.
_, _, err = fs.LoadNextMsg("foo.*.bar", true, 4, nil)
require_Error(t, err, ErrStoreEOF)

// Now make sure we did not load fss and cache.
var loaded int
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.RLock()
if mb.cache != nil || mb.fss != nil {
loaded++
}
mb.mu.RUnlock()
}
fs.mu.RUnlock()
// We will load first block for starting seq 4, but no others should have loaded.
require_Equal(t, loaded, 1)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -7419,3 +7471,32 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Make first msg one that would match as well.
fs.StoreMsg("foo.1.baz", nil, msg)
// Add in a bunch of msgs.
// We need to make sure we have a range of subjects that could kick in a linear scan.
for i := 0; i < 1_000_000; i++ {
subj := fmt.Sprintf("foo.%d.bar", rand.Intn(100_000)+2)
fs.StoreMsg(subj, nil, msg)
}

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
// Make sure not first seq.
_, _, err := fs.LoadNextMsg("foo.*.baz", true, 2, &smv)
require_Error(b, err, ErrStoreEOF)
}
}
Loading

0 comments on commit 7238011

Please sign in to comment.