diff --git a/util/lookupd/lookupd.go b/util/lookupd/lookupd.go index cf35c77b1..e0f09df8e 100644 --- a/util/lookupd/lookupd.go +++ b/util/lookupd/lookupd.go @@ -36,7 +36,7 @@ func GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, error) { } success = true // {"data":{"topics":["test"]}} - topics, _ := data.Get("topics").Array() + topics, _ := data.Get("topics").StringArray() allTopics = util.StringUnion(allTopics, topics) }(endpoint) } @@ -70,7 +70,7 @@ func GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, } success = true // {"data":{"channels":["test"]}} - channels, _ := data.Get("channels").Array() + channels, _ := data.Get("channels").StringArray() allChannels = util.StringUnion(allChannels, channels) }(endpoint) } @@ -257,10 +257,9 @@ func GetNSQDTopics(nsqdHTTPAddrs []string) ([]string, error) { } success = true topicList, _ := data.Get("topics").Array() - for _, topicInfo := range topicList { - topicInfo := topicInfo.(map[string]interface{}) - topicName := topicInfo["topic_name"].(string) - topics = util.StringAdd(topics, topicName) + for i := range topicList { + topicInfo := data.Get("topics").GetIndex(i) + topics = util.StringAdd(topics, topicInfo.Get("topic_name").MustString()) } }(endpoint) } @@ -295,10 +294,9 @@ func GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string) ([]string, erro } success = true topicList, _ := data.Get("topics").Array() - for _, topicInfo := range topicList { - topicInfo := topicInfo.(map[string]interface{}) - topicName := topicInfo["topic_name"].(string) - if topicName == topic { + for i := range topicList { + topicInfo := data.Get("topics").GetIndex(i) + if topicInfo.Get("topic_name").MustString() == topic { addresses = append(addresses, addr) return } @@ -342,15 +340,16 @@ func GetNSQDStats(nsqdHTTPAddrs []string, selectedTopic string) ([]*TopicStats, success = true topics, _ := data.Get("topics").Array() - for _, t := range topics { - t := t.(map[string]interface{}) + for i := range topics { + t := data.Get("topics").GetIndex(i) - topicName := t["topic_name"].(string) + topicName := t.Get("topic_name").MustString() if selectedTopic != "" && topicName != selectedTopic { continue } - depth := int64(t["depth"].(float64)) - backendDepth := int64(t["backend_depth"].(float64)) + depth := t.Get("depth").MustInt64() + backendDepth := t.Get("backend_depth").MustInt64() + channels := t.Get("channels").MustArray() topicStats := &TopicStats{ HostAddress: addr, @@ -358,16 +357,15 @@ func GetNSQDStats(nsqdHTTPAddrs []string, selectedTopic string) ([]*TopicStats, Depth: depth, BackendDepth: backendDepth, MemoryDepth: depth - backendDepth, - MessageCount: int64(t["message_count"].(float64)), - ChannelCount: len(t["channels"].([]interface{})), + MessageCount: t.Get("message_count").MustInt64(), + ChannelCount: len(channels), } topicStatsList = append(topicStatsList, topicStats) - channels := t["channels"].([]interface{}) - for _, c := range channels { - c := c.(map[string]interface{}) + for j := range channels { + c := t.Get("channels").GetIndex(j) - channelName := c["channel_name"].(string) + channelName := c.Get("channel_name").MustString() key := channelName if selectedTopic == "" { key = fmt.Sprintf("%s:%s", topicName, channelName) @@ -383,14 +381,9 @@ func GetNSQDStats(nsqdHTTPAddrs []string, selectedTopic string) ([]*TopicStats, channelStatsMap[key] = channelStats } - depth := int64(c["depth"].(float64)) - backendDepth := int64(c["backend_depth"].(float64)) - var paused bool - pausedInterface, ok := c["paused"] - if ok { - paused = pausedInterface.(bool) - } - clients := c["clients"].([]interface{}) + depth := c.Get("depth").MustInt64() + backendDepth := c.Get("backend_depth").MustInt64() + clients := c.Get("clients").MustArray() hostChannelStats := &ChannelStats{ HostAddress: addr, @@ -399,32 +392,35 @@ func GetNSQDStats(nsqdHTTPAddrs []string, selectedTopic string) ([]*TopicStats, Depth: depth, BackendDepth: backendDepth, MemoryDepth: depth - backendDepth, - Paused: paused, - InFlightCount: int64(c["in_flight_count"].(float64)), - DeferredCount: int64(c["deferred_count"].(float64)), - MessageCount: int64(c["message_count"].(float64)), - RequeueCount: int64(c["requeue_count"].(float64)), - TimeoutCount: int64(c["timeout_count"].(float64)), + Paused: c.Get("paused").MustBool(), + InFlightCount: c.Get("in_flight_count").MustInt64(), + DeferredCount: c.Get("deferred_count").MustInt64(), + MessageCount: c.Get("message_count").MustInt64(), + RequeueCount: c.Get("requeue_count").MustInt64(), + TimeoutCount: c.Get("timeout_count").MustInt64(), // TODO: this is sort of wrong; clients should be de-duped // client A that connects to NSQD-a and NSQD-b should only be counted once. right? ClientCount: len(clients), } channelStats.Add(hostChannelStats) - for _, client := range clients { - client := client.(map[string]interface{}) - connected := time.Unix(int64(client["connect_ts"].(float64)), 0) + for k := range clients { + client := c.Get("clients").GetIndex(k) + + connected := time.Unix(client.Get("connect_ts").MustInt64(), 0) connectedDuration := time.Now().Sub(connected).Seconds() + clientInfo := &ClientInfo{ - HostAddress: addr, - ClientVersion: client["version"].(string), - ClientIdentifier: fmt.Sprintf("%s:%s", client["name"].(string), strings.Split(client["remote_address"].(string), ":")[1]), + HostAddress: addr, + ClientVersion: client.Get("version").MustString(), + ClientIdentifier: fmt.Sprintf("%s:%s", client.Get("name").MustString(), + strings.Split(client.Get("remote_address").MustString(), ":")[1]), ConnectedDuration: time.Duration(int64(connectedDuration)) * time.Second, // truncate to second - InFlightCount: int(client["in_flight_count"].(float64)), - ReadyCount: int(client["ready_count"].(float64)), - FinishCount: int64(client["finish_count"].(float64)), - RequeueCount: int64(client["requeue_count"].(float64)), - MessageCount: int64(client["message_count"].(float64)), + InFlightCount: client.Get("in_flight_count").MustInt(), + ReadyCount: client.Get("ready_count").MustInt(), + FinishCount: client.Get("finish_count").MustInt64(), + RequeueCount: client.Get("requeue_count").MustInt64(), + MessageCount: client.Get("message_count").MustInt64(), } hostChannelStats.Clients = append(hostChannelStats.Clients, clientInfo) channelStats.Clients = append(channelStats.Clients, clientInfo) diff --git a/util/strings.go b/util/strings.go index 7874dab14..6be5d8555 100644 --- a/util/strings.go +++ b/util/strings.go @@ -1,34 +1,27 @@ package util func StringAdd(s []string, a string) []string { - o := s - found := false for _, existing := range s { if a == existing { - found = true return s } } - if found == false { - o = append(o, a) - } - return o + return append(s, a) } -func StringUnion(s []string, a []interface{}) []string { - o := s +func StringUnion(s []string, a []string) []string { for _, entry := range a { found := false for _, existing := range s { - if entry.(string) == existing { + if entry == existing { found = true break } } - if found == false { - o = append(o, entry.(string)) + if !found { + s = append(s, entry) } } - return o + return s }