Skip to content

Commit

Permalink
Merge pull request #1236 from nats-io/fix_1235
Browse files Browse the repository at this point in the history
[FIXED] Possible panic on startup when monitoring endpoint inspected
  • Loading branch information
kozlovic authored Jan 31, 2022
2 parents a17a5b6 + c6f1e64 commit aacb792
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 12 deletions.
31 changes: 19 additions & 12 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,21 +172,21 @@ func (s *StanServer) startMonitoring(nOpts *natsd.Options) error {
func (s *StanServer) HandleRootz(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, `<html lang="en">
<head>
<link rel="shortcut icon" href="http://nats.io/img/favicon.ico">
<link rel="shortcut icon" href="https://nats.io/img/favicon.ico">
<style type="text/css">
body { font-family: "Century Gothic", CenturyGothic, AppleGothic, sans-serif; font-size: 22; }
a { margin-left: 32px; }
</style>
</head>
<body>
<img src="http://nats.io/img/logo.png" alt="NATS Streaming">
<img src="https://nats.io/img/logo.png" alt="NATS Streaming">
<br/>
<a href=.%s>server</a><br/>
<a href=.%s>store</a><br/>
<a href=.%s>clients</a><br/>
<a href=.%s>channels</a><br/>
<br/>
<a href=http://nats.io/documentation/server/gnatsd-monitoring/>help</a>
<a href=https://docs.nats.io/legacy/stan/intro/monitoring/>help</a>
</body>
</html>`, ServerPath, StorePath, ClientsPath, ChannelsPath)
}
Expand All @@ -198,15 +198,6 @@ func (s *StanServer) HandleServerz(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("Error getting information about channels state: %v", err), http.StatusInternalServerError)
return
}
var role string
var nodeID string
s.mu.RLock()
state := s.state
if s.raft != nil {
role = s.raft.State().String()
nodeID = s.info.NodeID
}
s.mu.RUnlock()

numSubs := s.numSubs()
now := time.Now()
Expand All @@ -227,6 +218,15 @@ func (s *StanServer) HandleServerz(w http.ResponseWriter, r *http.Request) {
maxFDs = int(limits.OpenFiles)
}

var role string
var nodeID string

s.mu.RLock()
state := s.state
if s.raft != nil {
role = s.raft.State().String()
nodeID = s.info.NodeID
}
serverz := &Serverz{
ClusterID: s.info.ClusterID,
ServerID: s.serverID,
Expand All @@ -250,6 +250,7 @@ func (s *StanServer) HandleServerz(w http.ResponseWriter, r *http.Request) {
OpenFDs: fds,
MaxFDs: maxFDs,
}
s.mu.RUnlock()
s.sendResponse(w, r, serverz)
}

Expand Down Expand Up @@ -293,6 +294,7 @@ func (s *StanServer) HandleStorez(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("Error getting information about channels state: %v", err), http.StatusInternalServerError)
return
}
s.mu.RLock()
storez := &Storez{
ClusterID: s.info.ClusterID,
ServerID: s.serverID,
Expand All @@ -302,6 +304,7 @@ func (s *StanServer) HandleStorez(w http.ResponseWriter, r *http.Request) {
TotalMsgs: count,
TotalBytes: bytes,
}
s.mu.RUnlock()
s.sendResponse(w, r, storez)
}

Expand Down Expand Up @@ -353,6 +356,7 @@ func (s *StanServer) HandleClientsz(w http.ResponseWriter, r *http.Request) {
}
}
carr = carr[0:carrSize]
s.mu.RLock()
clientsz := &Clientsz{
ClusterID: s.info.ClusterID,
ServerID: s.serverID,
Expand All @@ -363,6 +367,7 @@ func (s *StanServer) HandleClientsz(w http.ResponseWriter, r *http.Request) {
Count: len(carr),
Clients: carr,
}
s.mu.RUnlock()
s.sendResponse(w, r, clientsz)
}
}
Expand Down Expand Up @@ -499,6 +504,7 @@ func (s *StanServer) HandleChannelsz(w http.ResponseWriter, r *http.Request) {
channels := s.channels.getAll()
totalChannels := len(channels)
minoff, maxoff := getMinMaxOffset(offset, limit, totalChannels)
s.mu.RLock()
channelsz := &Channelsz{
ClusterID: s.info.ClusterID,
ServerID: s.serverID,
Expand All @@ -507,6 +513,7 @@ func (s *StanServer) HandleChannelsz(w http.ResponseWriter, r *http.Request) {
Limit: limit,
Total: totalChannels,
}
s.mu.RUnlock()
if subsOption == 1 {
carr := make([]*Channelz, 0, totalChannels)
for cn := range channels {
Expand Down
65 changes: 65 additions & 0 deletions server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1448,3 +1449,67 @@ func TestMonitorInOutMsgs(t *testing.T) {
t.Fatalf("Expected 15 outbound messages, got %v - %v", sz.InMsgs, sz.InBytes)
}
}

func TestMonitorNoPanicOnServerRestart(t *testing.T) {
resetPreviousHTTPConnections()
cleanupDatastore(t)
defer cleanupDatastore(t)
opts := getTestDefaultOptsForPersistentStore()

ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

opts.NATSServerURL = "nats://127.0.0.1:4222"
s := runMonitorServer(t, opts)
defer s.Shutdown()

sc := NewDefaultConnection(t)
defer sc.Close()

for i := 0; i < 100; i++ {
if _, err := sc.Subscribe(fmt.Sprintf("foo.%d", i+1),
func(_ *stan.Msg) {}, stan.DurableName("dur")); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
}

endpoints := []string{
"channelsz?subs=1",
"serverz",
"storez",
"clientsz",
"isFTActive",
}
for _, e := range endpoints {
s.Shutdown()

wg := sync.WaitGroup{}
wg.Add(1)
done := make(chan struct{})
go func() {
defer wg.Done()
url := fmt.Sprintf("http://%s:%d/streaming/", monitorHost, monitorPort)
for {
resp, err := http.DefaultClient.Get(url + e)
if err != nil {
continue
}
ioutil.ReadAll(resp.Body)
resp.Body.Close()
select {
case <-done:
return
default:
}
}
}()

s = runMonitorServer(t, opts)
defer s.Shutdown()

time.Sleep(100 * time.Millisecond)
close(done)
wg.Wait()
}
sc.Close()
}
2 changes: 2 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2525,7 +2525,9 @@ func (s *StanServer) processRecoveredChannels(channels map[string]*stores.Recove
allSubs := make([]*subState, 0, 16)

for channelName, recoveredChannel := range channels {
s.channels.Lock()
channel, err := s.channels.create(s, channelName, recoveredChannel.Channel)
s.channels.Unlock()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit aacb792

Please sign in to comment.