Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Adding] Accountz monitoring endpoint and INFO monitoring req subject #1611

Merged
merged 2 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2618,6 +2618,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
a.jsLimits = nil
}
}
a.updated = time.Now()
a.mu.Unlock()

clients := gatherClients()
Expand Down
25 changes: 25 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,10 @@ func (s *Server) initEventTracking() {
optz := &LeafzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Leafz(&optz.LeafzOptions) })
},
"ACCOUNTZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &AccountzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Accountz(&optz.AccountzOptions) })
},
}
for name, req := range monSrvc {
subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name)
Expand Down Expand Up @@ -681,6 +685,16 @@ func (s *Server) initEventTracking() {
}
})
},
"INFO": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &AccInfoEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
return s.accountInfo(acc)
}
})
},
"CONNS": s.connsRequest,
}
for name, req := range monAccSrvc {
Expand Down Expand Up @@ -918,7 +932,12 @@ type EventFilterOptions struct {
// StatszEventOptions are options passed to Statsz
type StatszEventOptions struct {
// No actual options yet
EventFilterOptions
}

// Options for account Info
type AccInfoEventOptions struct {
// No actual options yet
EventFilterOptions
}

Expand Down Expand Up @@ -958,6 +977,12 @@ type LeafzEventOptions struct {
EventFilterOptions
}

// In the context of system events, AccountzEventOptions are options passed to Accountz
type AccountzEventOptions struct {
AccountzOptions
EventFilterOptions
}

// returns true if the request does NOT apply to this server and can be ignored.
// DO NOT hold the server lock when
func (s *Server) filterRequest(fOpts *EventFilterOptions) bool {
Expand Down
114 changes: 112 additions & 2 deletions server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,112 @@ func TestAccountReqMonitoring(t *testing.T) {
}
}

func TestAccountReqInfo(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sacc, sakp := createAccount(s)
s.setSystemAccount(sacc)
// Let's create an account with service export.
akp, _ := nkeys.CreateAccount()
pub1, _ := akp.PublicKey()
nac1 := jwt.NewAccountClaims(pub1)
nac1.Exports.Add(&jwt.Export{Subject: "req.*", Type: jwt.Service})
ajwt1, _ := nac1.Encode(oKp)
addAccountToMemResolver(s, pub1, ajwt1)
s.LookupAccount(pub1)
info1 := fmt.Sprintf(accReqSubj, pub1, "INFO")
// Now add an account with service imports.
akp2, _ := nkeys.CreateAccount()
pub2, _ := akp2.PublicKey()
nac2 := jwt.NewAccountClaims(pub2)
nac2.Imports.Add(&jwt.Import{Account: pub1, Subject: "req.1", Type: jwt.Service})
ajwt2, _ := nac2.Encode(oKp)
addAccountToMemResolver(s, pub2, ajwt2)
s.LookupAccount(pub2)
info2 := fmt.Sprintf(accReqSubj, pub2, "INFO")
// Create system account connection to query
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
ncSys, err := nats.Connect(url, createUserCreds(t, s, sakp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncSys.Close()
checkCommon := func(info *AccountInfo, srv *ServerInfo, pub, jwt string) {
if info.Complete != true {
t.Fatalf("Unexpected value: %v", info.Complete)
} else if info.Expired != false {
t.Fatalf("Unexpected value: %v", info.Expired)
} else if info.JetStream != false {
t.Fatalf("Unexpected value: %v", info.JetStream)
} else if info.ClientCnt != 0 {
t.Fatalf("Unexpected value: %v", info.ClientCnt)
} else if info.AccountName != pub {
t.Fatalf("Unexpected value: %v", info.AccountName)
} else if info.LeafCnt != 0 {
t.Fatalf("Unexpected value: %v", info.LeafCnt)
} else if info.Jwt != jwt {
t.Fatalf("Unexpected value: %v", info.Jwt)
} else if srv.Cluster != "abc" {
t.Fatalf("Unexpected value: %v", srv.Cluster)
} else if srv.Name != s.Name() {
t.Fatalf("Unexpected value: %v", srv.Name)
} else if srv.Host != opts.Host {
t.Fatalf("Unexpected value: %v", srv.Host)
} else if srv.Seq < 1 {
t.Fatalf("Unexpected value: %v", srv.Seq)
}
}
info := AccountInfo{}
srv := ServerInfo{}
msg := struct {
Data *AccountInfo `json:"data"`
Srv *ServerInfo `json:"server"`
}{
&info,
&srv,
}
if resp, err := ncSys.Request(info1, nil, time.Second); err != nil {
t.Fatalf("Error on request: %v", err)
} else if err := json.Unmarshal(resp.Data, &msg); err != nil {
t.Fatalf("Unmarshalling failed: %v", err)
} else if len(info.Exports) != 1 {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if len(info.Imports) != 0 {
t.Fatalf("Unexpected value: %v", info.Imports)
} else if info.Exports[0].Subject != "req.*" {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.Exports[0].Type != jwt.Service {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.Exports[0].ResponseType != jwt.ResponseTypeSingleton {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.SubCnt != 0 {
t.Fatalf("Unexpected value: %v", info.SubCnt)
} else {
checkCommon(&info, &srv, pub1, ajwt1)
}
info = AccountInfo{}
srv = ServerInfo{}
if resp, err := ncSys.Request(info2, nil, time.Second); err != nil {
t.Fatalf("Error on request: %v", err)
} else if err := json.Unmarshal(resp.Data, &msg); err != nil {
t.Fatalf("Unmarshalling failed: %v", err)
} else if len(info.Exports) != 0 {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if len(info.Imports) != 1 {
t.Fatalf("Unexpected value: %v", info.Imports)
} else if info.Imports[0].Subject != "req.1" {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.Imports[0].Type != jwt.Service {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.Imports[0].Account != pub1 {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.SubCnt != 1 {
t.Fatalf("Unexpected value: %v", info.SubCnt)
} else {
checkCommon(&info, &srv, pub2, ajwt2)
}
}

func TestAccountClaimsUpdatesWithServiceImports(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
Expand Down Expand Up @@ -1453,7 +1559,7 @@ func TestSystemAccountWithGateways(t *testing.T) {

// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 30, sa)
checkExpectedSubs(t, 33, sa)

// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
Expand Down Expand Up @@ -1775,7 +1881,7 @@ func TestServerEventsPingMonitorz(t *testing.T) {
sa, _, sb, optsB, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()

sysAcc, _ := akp.PublicKey()
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
if err != nil {
Expand Down Expand Up @@ -1814,6 +1920,8 @@ func TestServerEventsPingMonitorz(t *testing.T) {
[]string{"now", "outbound_gateways", "inbound_gateways"}},
{"LEAFZ", &LeafzOptions{}, &Leafz{},
[]string{"now", "leafs"}},
{"ACCOUNTZ", &AccountzOptions{}, &Accountz{},
[]string{"now", "accounts"}},

{"SUBSZ", &SubszOptions{Limit: 5}, &Subsz{},
[]string{"num_subscriptions", "num_cache"}},
Expand All @@ -1825,6 +1933,8 @@ func TestServerEventsPingMonitorz(t *testing.T) {
[]string{"now", "outbound_gateways", "inbound_gateways"}},
{"LEAFZ", &LeafzOptions{Subscriptions: true}, &Leafz{},
[]string{"now", "leafs"}},
{"ACCOUNTZ", &AccountzOptions{Account: sysAcc}, &Accountz{},
[]string{"now", "account_detail"}},

{"ROUTEZ", json.RawMessage(`{"cluster":""}`), &Routez{},
[]string{"now", "routes"}},
Expand Down
1 change: 1 addition & 0 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2994,6 +2994,7 @@ func TestJWTAccountLimitsMaxConnsAfterExpired(t *testing.T) {
acc, _ := s.LookupAccount(fooPub)
acc.mu.Lock()
acc.expired = true
acc.updated = time.Now().Add(-2 * time.Second) // work around updating to quickly
acc.mu.Unlock()

// Now update with new expiration and max connections lowered to 2
Expand Down
Loading