Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqadmin: add message rate #143

Merged
merged 2 commits into from
Feb 15, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions nsqadmin/graph_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package main

import (
"../util"
"encoding/json"
"errors"
"fmt"
"github.com/bitly/go-simplejson"
"html/template"
"log"
"net/http"
Expand Down Expand Up @@ -134,7 +137,7 @@ func startEndForTimeframe(t time.Duration) (string, string) {

func (t *Topic) Target(g *GraphOptions, key string) (string, string) {
color := "blue"
if key == "depth" {
if key == "depth" || key == "requeue_count" {
color = "red"
}
target := fmt.Sprintf("%snsq.*.topic.%s.%s", g.Prefix(metricType(key)), t.TopicName, key)
Expand All @@ -150,13 +153,18 @@ func (t *Topic) LargeGraph(g *GraphOptions, key string) template.URL {
return g.LargeGraph(target, color)
}

func (t *Topic) Rate(g *GraphOptions) string {
target, _ := t.Target(g, "message_count")
return target
}

func (t *TopicHostStats) Target(g *GraphOptions, key string) (string, string) {
h := graphiteHostKey(t.HostAddress)
if t.Aggregate {
h = "*"
}
color := "blue"
if key == "depth" {
if key == "depth" || key == "requeue_count" {
color = "red"
}
target := fmt.Sprintf("%snsq.%s.topic.%s.%s", g.Prefix(metricType(key)), h, t.Topic, key)
Expand All @@ -170,6 +178,10 @@ func (t *TopicHostStats) LargeGraph(g *GraphOptions, key string) template.URL {
target, color := t.Target(g, key)
return g.LargeGraph(target, color)
}
func (t *TopicHostStats) Rate(g *GraphOptions) string {
target, _ := t.Target(g, "message_count")
return target
}

func metricType(key string) string {
metricType := "counter"
Expand All @@ -186,7 +198,7 @@ func (c *ChannelStats) Target(g *GraphOptions, key string) (string, string) {
h = graphiteHostKey(c.HostAddress)
}
color := "blue"
if key == "depth" {
if key == "depth" || key == "requeue_count" {
color = "red"
}
target := fmt.Sprintf("%snsq.%s.topic.%s.channel.%s.%s", g.Prefix(metricType(key)), h, c.Topic, c.ChannelName, key)
Expand All @@ -200,6 +212,10 @@ func (c *ChannelStats) LargeGraph(g *GraphOptions, key string) template.URL {
target, color := c.Target(g, key)
return g.LargeGraph(target, color)
}
func (t *ChannelStats) Rate(g *GraphOptions) string {
target, _ := t.Target(g, "message_count")
return target
}

func (g *GraphOptions) Sparkline(target string, color string) template.URL {
params := url.Values{}
Expand Down Expand Up @@ -233,6 +249,38 @@ func (g *GraphOptions) LargeGraph(target string, color string) template.URL {
return template.URL(fmt.Sprintf("%s/render?%s", g.GraphiteUrl, params.Encode()))
}

func rateQuery(target string) string {
params := url.Values{}
params.Set("from", "-2min")
params.Set("until", "-1min")
params.Set("format", "json")
params.Set("target", fmt.Sprintf("sumSeries(%s)", target))
return fmt.Sprintf("/render?%s", params.Encode())
}

func parseRateResponse(body []byte) (string, error) {
js, err := simplejson.NewJson([]byte(body))
if err != nil {
log.Printf("ERROR: failed to parse metadata - %s", err.Error())
return "", err
}

js, ok := js.GetIndex(0).CheckGet("datapoints")
if !ok {
return "", errors.New("datapoints not found")
}

rate, err := js.GetIndex(0).GetIndex(0).Float64()
rate_str := fmt.Sprintf("%.2f", rate/60)
response := map[string]string{"datapoint": rate_str}
byte_response, err := json.Marshal(response)
if err != nil {
return "", errors.New("marshal failed")
}

return string(byte_response), nil
}

func graphiteHostKey(h string) string {
s := strings.Replace(h, ".", "_", -1)
return strings.Replace(s, ":", "_", -1)
Expand Down
76 changes: 76 additions & 0 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"html/template"
"io"
"io/ioutil"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -72,6 +73,7 @@ func httpServer(listener net.Listener) {
} else {
proxy := NewSingleHostReverseProxy(url, 20*time.Second)
handler.Handle("/render", proxy)
handler.HandleFunc("/graphite_data", graphiteDataHandler)
}
}

Expand Down Expand Up @@ -683,3 +685,77 @@ func counterDataHandler(w http.ResponseWriter, req *http.Request) {
func faviconHandler(w http.ResponseWriter, req *http.Request) {
http.NotFound(w, req)
}

func graphiteDataHandler(w http.ResponseWriter, req *http.Request) {
reqParams, err := util.NewReqParams(req)
if err != nil {
log.Printf("ERROR: failed to parse request params - %s", err.Error())
http.Error(w, "INVALID_REQUEST", 500)
return
}

metric, err := reqParams.Get("metric")
if err != nil {
log.Printf("ERROR: missing metric param - %s", err.Error())
http.Error(w, "MISSING_METRIC_PARAM", 500)
return
}

target, err := reqParams.Get("target")
if err != nil {
log.Printf("ERROR: missing target param - %s", err.Error())
http.Error(w, "MISSING_TARGET_PARAM", 500)
return
}

var queryFunc func(string) string
var formatJsonResponseFunc func([]byte) (string, error)

switch metric {
case "rate":
queryFunc = rateQuery
formatJsonResponseFunc = parseRateResponse
default:
log.Printf("ERROR: unknown metric value %s", metric)
http.Error(w, "INVALID_METRIC_PARAM", 500)
return
}

query := queryFunc(target)
response, err := GraphiteGet(*graphiteUrl + query)
if err != nil {
log.Printf("ERROR: graphite request failed %s", err.Error())
http.Error(w, "GRAPHITE_FAILED", 500)
return
}

formated_response, err := formatJsonResponseFunc(response)
if err != nil {
log.Printf("ERROR: response formating failed - %s", err.Error())
http.Error(w, "INVALID_GRAPHITE_RESPONSE", 500)
return
}

w.Header().Set("Content-Type", "application/json")
io.WriteString(w, formated_response)
return
}

func GraphiteGet(request_url string) ([]byte, error) {
response, err := http.Get(request_url)

var contents []byte

if err != nil {
log.Printf("ERROR: GET request to graphite failed %s", err)
return nil, err
}

defer response.Body.Close()
contents, err = ioutil.ReadAll(response.Body)
if err != nil {
log.Printf("ERROR: reading GET body failed %s", err)
return nil, err
}
return contents, nil
}
21 changes: 14 additions & 7 deletions nsqadmin/lookupd_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ func getLookupdProducers(lookupdHTTPAddrs []string) ([]*Producer, error) {
address := producer.Get("address").MustString() //TODO: remove for 1.0
hostname := producer.Get("hostname").MustString()
broadcastAddress := producer.Get("broadcast_address").MustString()

if broadcastAddress == "" {
broadcastAddress = address
}
httpPort := producer.Get("http_port").MustInt()
tcpPort := producer.Get("tcp_port").MustInt()
key := fmt.Sprintf("%s:%d:%d", broadcastAddress, httpPort, tcpPort)
Expand Down Expand Up @@ -183,12 +185,17 @@ func getLookupdTopicProducers(topic string, lookupdHTTPAddrs []string) ([]string
return
}
success = true
producers, _ := data.Get("producers").Array()
for _, producer := range producers {
producer := producer.(map[string]interface{})
broadcastAddress := producer["broadcast_address"].(string)
port := int(producer["http_port"].(float64))
key := fmt.Sprintf("%s:%d", broadcastAddress, port)
producers := data.Get("producers")
producersArray, _ := producers.Array()
for i, _ := range producersArray {
producer := producers.GetIndex(i)
address := producer.Get("address").MustString() //TODO: remove for 1.0
broadcastAddress := producer.Get("broadcast_address").MustString()
if broadcastAddress == "" {
broadcastAddress = address
}
httpPort := producer.Get("http_port").MustInt()
key := fmt.Sprintf("%s:%d", broadcastAddress, httpPort)
allSources = util.StringAdd(allSources, key)
}
}(endpoint)
Expand Down
3 changes: 3 additions & 0 deletions nsqadmin/templates/channel.html
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ <h4>Channel Message Queue</h4>
<th>Requeued</th>
<th>Timed Out</th>
<th>Messages</th>
{{if $g.Enabled}}<th>Rate</th>{{end}}
<th>Connections</th>
</tr>

Expand All @@ -83,6 +84,7 @@ <h4>Channel Message Queue</h4>
<td>{{$c.RequeueCount | commafy}}</td>
<td>{{$c.TimeoutCount | commafy}}</td>
<td>{{$c.MessageCount | commafy}}</td>
{{if $g.Enabled}}<td class="bold rate" target="{{.Rate $g}}"></td> {{end}}
<td>{{$c.ClientCount}}</td>
</tr>
{{if $g.Enabled}}
Expand Down Expand Up @@ -110,6 +112,7 @@ <h4>Channel Message Queue</h4>
<td>{{$c.RequeueCount | commafy}}</td>
<td>{{$c.TimeoutCount | commafy}}</td>
<td>{{$c.MessageCount | commafy}}</td>
{{if $g.Enabled}}<td class="bold rate" target="{{$c.Rate $g}}"></td> {{end}}
<td>{{$c.ClientCount}}</td>
</tr>
{{if $g.Enabled}}
Expand Down
3 changes: 3 additions & 0 deletions nsqadmin/templates/header.html
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
.red:hover {
color: #f30;
}
.bold {
font-weight: bold;
}
</style>
</head>
<body>
Expand Down
6 changes: 4 additions & 2 deletions nsqadmin/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ <h2>Topics</h2>
{{if .Topics}}
<table class="table table-condensed table-bordered">
<tr>
<th>Topic</th>
{{if $g.Enabled}}<th width="120">Depth</th>{{end}}
{{if $g.Enabled}}<th width="120">Messages</th>{{end}}
<th>Topic</th>
{{if $g.Enabled}}<th width="120">Rate</th>{{end}}
</tr>
{{range .Topics}}
<tr>
<td><a href="/topic/{{.TopicName}}">{{.TopicName}}</a></td>
{{if $g.Enabled}}<td><a href="/topic/{{.TopicName}}"><img width="120" height="20" src="{{.Sparkline $g "depth"}}"></a></td>{{end}}
{{if $g.Enabled}}<td><a href="/topic/{{.TopicName}}"><img width="120" height="20" src="{{.Sparkline $g "message_count"}}"></a></td>{{end}}
<td><a href="/topic/{{.TopicName}}">{{.TopicName}}</a></td>
{{if $g.Enabled}}<td class="bold rate" target="{{.Rate $g}}"></td> {{end}}
</tr>
{{end}}
</table>
Expand Down
13 changes: 13 additions & 0 deletions nsqadmin/templates/js.html
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,17 @@
});
return false;
});

$('.rate').each(function() {
var self = this;
$.getJSON("/graphite_data", {
metric: "rate",
target: $(this).attr("target")
},
function(rate) {
$(self).html(rate.datapoint);
});
});


</script>
7 changes: 6 additions & 1 deletion nsqadmin/templates/topic.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ <h4>Topic Message Queue</h4>
<th>Depth</th>
<th>Memory + Disk</th>
<th>Messages</th>
{{if $g.Enabled}}<th>Rate</th>{{end}}
<th>Channels</th>
</tr>
{{range .TopicHostStats}}
Expand All @@ -58,10 +59,11 @@ <h4>Topic Message Queue</h4>
{{if $g.Enabled}}<a href="{{.LargeGraph $g "message_count"}}"><img width="120" src="{{.Sparkline $g "message_count"}}"></a>{{end}}
{{.MessageCount | commafy}}
</td>
{{if $g.Enabled}}<td class="bold rate" target="{{.Rate $g}}"></td> {{end}}
<td>{{.ChannelCount}}</td>
</tr>
{{end}}
{{with .GlobalTopicStats}}
{{with .GlobalTopicStats}}
<tr class="info">
<td>Total:</td>
<td>
Expand All @@ -73,6 +75,7 @@ <h4>Topic Message Queue</h4>
{{if $g.Enabled}}<a href="{{.LargeGraph $g "message_count"}}"><img width="120" height="20" src="{{.Sparkline $g "message_count"}}"></a>{{end}}
{{.MessageCount | commafy}}
</td>
{{if $g.Enabled}}<td class="bold rate" target="{{.Rate $g}}"></td> {{end}}
<td>{{.ChannelCount}}</td>
</tr>
{{end}}
Expand Down Expand Up @@ -101,6 +104,7 @@ <h4>Channel Message Queues</h4>
<th>Requeued</th>
<th>Timed Out</th>
<th>Messages</th>
{{if $g.Enabled}}<th>Rate</th>{{end}}
<th>Connections</th>
</tr>

Expand All @@ -118,6 +122,7 @@ <h4>Channel Message Queues</h4>
<td>{{$c.RequeueCount | commafy}}</td>
<td>{{$c.TimeoutCount | commafy}}</td>
<td>{{$c.MessageCount | commafy}}</td>
{{if $g.Enabled}}<td class="bold rate" target="{{.Rate $g}}"></td> {{end}}
<td>
{{if $g.Enabled}}<a href="{{$c.LargeGraph $g "clients"}}"><img width="120" height="20" src="{{$c.Sparkline $g "clients"}}"></a>{{end}}
{{$c.ClientCount}}</td>
Expand Down