Skip to content

Commit

Permalink
backport: nsqio#1099
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Nov 12, 2018
1 parent cd964d9 commit 875829f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
8 changes: 7 additions & 1 deletion nsqlookupd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,17 +324,23 @@ func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httpro
producers := s.ctx.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive(
s.ctx.nsqlookupd.opts.InactiveProducerTimeout, 0)
nodes := make([]*node, len(producers))
topicProducersMap := make(map[string]Producers)
for i, p := range producers {
topics := s.ctx.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filter("topic", "*", "").Keys()

// for each topic find the producer that matches this peer
// to add tombstone information
tombstones := make([]bool, len(topics))
for j, t := range topics {
topicProducers := s.ctx.nsqlookupd.DB.FindProducers("topic", t, "")
if _, exists := topicProducersMap[t]; !exists {
topicProducersMap[t] = s.ctx.nsqlookupd.DB.FindProducers("topic", t, "")
}

topicProducers := topicProducersMap[t]
for _, tp := range topicProducers {
if tp.peerInfo == p.peerInfo {
tombstones[j] = tp.IsTombstoned(s.ctx.nsqlookupd.opts.TombstoneLifetime)
break
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions nsqlookupd/registration_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,21 @@ func (r *RegistrationDB) FindProducers(category string, key string, subkey strin
return ProducerMap2Slice(r.registrationMap[k])
}

results := make(map[string]*Producer)
results := make(map[string]struct{})
var retProducers Producers
for k, producers := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
}
for _, producer := range producers {
_, found := results[producer.peerInfo.id]
if found == false {
results[producer.peerInfo.id] = producer
results[producer.peerInfo.id] = struct{}{}
retProducers = append(retProducers, producer)
}
}
}
return ProducerMap2Slice(results)
return retProducers
}

func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
Expand Down

0 comments on commit 875829f

Please sign in to comment.