diff --git a/nsqlookupd/registration_db.go b/nsqlookupd/registration_db.go index 4326748e9..7b6bd32e9 100644 --- a/nsqlookupd/registration_db.go +++ b/nsqlookupd/registration_db.go @@ -9,7 +9,7 @@ import ( type RegistrationDB struct { sync.RWMutex - registrationMap map[Registration]Producers + registrationMap map[Registration]ProducerMap } type Registration struct { @@ -37,6 +37,7 @@ type Producer struct { } type Producers []*Producer +type ProducerMap map[string]*Producer func (p *Producer) String() string { return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort) @@ -53,7 +54,7 @@ func (p *Producer) IsTombstoned(lifetime time.Duration) bool { func NewRegistrationDB() *RegistrationDB { return &RegistrationDB{ - registrationMap: make(map[Registration]Producers), + registrationMap: make(map[Registration]ProducerMap), } } @@ -63,7 +64,7 @@ func (r *RegistrationDB) AddRegistration(k Registration) { defer r.Unlock() _, ok := r.registrationMap[k] if !ok { - r.registrationMap[k] = Producers{} + r.registrationMap[k] = make(map[string]*Producer) } } @@ -71,16 +72,14 @@ func (r *RegistrationDB) AddRegistration(k Registration) { func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool { r.Lock() defer r.Unlock() - producers := r.registrationMap[k] - found := false - for _, producer := range producers { - if producer.peerInfo.id == p.peerInfo.id { - found = true - break - } + _, ok := r.registrationMap[k] + if !ok { + r.registrationMap[k] = make(map[string]*Producer) } + producers := r.registrationMap[k] + _, found := producers[p.peerInfo.id] if found == false { - r.registrationMap[k] = append(producers, p) + producers[p.peerInfo.id] = p } return !found } @@ -94,17 +93,13 @@ func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) { return false, 0 } removed := false - cleaned := Producers{} - for _, producer := range producers { - if producer.peerInfo.id != id { - cleaned = append(cleaned, producer) - } else { - removed = true - } + if _, exists := producers[id]; exists { + removed = true } + // Note: this leaves keys in the DB even if they have empty lists - r.registrationMap[k] = cleaned - return removed, len(cleaned) + delete(producers, id) + return removed, len(producers) } // remove a Registration and all it's producers @@ -143,27 +138,22 @@ func (r *RegistrationDB) FindProducers(category string, key string, subkey strin defer r.RUnlock() if !r.needFilter(key, subkey) { k := Registration{category, key, subkey} - return r.registrationMap[k] + return ProducerMap2Slice(r.registrationMap[k]) } - results := Producers{} + results := make(map[string]*Producer) for k, producers := range r.registrationMap { if !k.IsMatch(category, key, subkey) { continue } for _, producer := range producers { - found := false - for _, p := range results { - if producer.peerInfo.id == p.peerInfo.id { - found = true - } - } + _, found := results[producer.peerInfo.id] if found == false { - results = append(results, producer) + results[producer.peerInfo.id] = producer } } } - return results + return ProducerMap2Slice(results) } func (r *RegistrationDB) LookupRegistrations(id string) Registrations { @@ -171,11 +161,8 @@ func (r *RegistrationDB) LookupRegistrations(id string) Registrations { defer r.RUnlock() results := Registrations{} for k, producers := range r.registrationMap { - for _, p := range producers { - if p.peerInfo.id == id { - results = append(results, k) - break - } + if _, exists := producers[id]; exists { + results = append(results, k) } } return results @@ -240,3 +227,12 @@ func (pp Producers) PeerInfo() []*PeerInfo { } return results } + +func ProducerMap2Slice(pm ProducerMap) Producers { + var producers Producers + for _, producer := range pm { + producers = append(producers, producer) + } + + return producers +} diff --git a/nsqlookupd/registration_db_test.go b/nsqlookupd/registration_db_test.go index b963d1624..6a861aedf 100644 --- a/nsqlookupd/registration_db_test.go +++ b/nsqlookupd/registration_db_test.go @@ -1,6 +1,8 @@ package nsqlookupd import ( + "math/rand" + "strconv" "testing" "time" @@ -96,3 +98,122 @@ func TestRegistrationDB(t *testing.T) { k = db.FindRegistrations("c", "*", "*").Keys() test.Equal(t, 0, len(k)) } + + +func fillRegDB(registrations int, producers int) *RegistrationDB { + regDB := NewRegistrationDB() + for i := 0; i < registrations; i++ { + regT := Registration{"topic", "t" + strconv.Itoa(i), ""} + regCa := Registration{"channel", "t" + strconv.Itoa(i), "ca" + strconv.Itoa(i)} + regCb := Registration{"channel", "t" + strconv.Itoa(i), "cb" + strconv.Itoa(i)} + for j := 0; j < producers; j++ { + p := Producer{ + peerInfo: &PeerInfo{ + id: "p" + strconv.Itoa(j), + }, + } + regDB.AddProducer(regT, &p) + regDB.AddProducer(regCa, &p) + regDB.AddProducer(regCb, &p) + } + } + return regDB +} + +func benchmarkLookupRegistrations(b *testing.B, registrations int, producers int) { + regDB := fillRegDB(registrations, producers) + b.ResetTimer() + for i := 0; i < b.N; i++ { + j := strconv.Itoa(rand.Intn(producers)) + _ = regDB.LookupRegistrations("p" + j) + } +} + +func benchmarkRegister(b *testing.B, registrations int, producers int) { + for i := 0; i < b.N; i++ { + _ = fillRegDB(registrations, producers) + } +} + +func benchmarkDoLookup(b *testing.B, registrations int, producers int) { + regDB := fillRegDB(registrations, producers) + b.ResetTimer() + for i := 0; i < b.N; i++ { + topic := "t" + strconv.Itoa(rand.Intn(registrations)) + _ = regDB.FindRegistrations("topic", topic, "") + _ = regDB.FindRegistrations("channel", topic, "*").SubKeys() + _ = regDB.FindProducers("topic", topic, "") + } +} + +func BenchmarkLookupRegistrations8x8(b *testing.B) { + benchmarkLookupRegistrations(b, 8, 8) +} + +func BenchmarkLookupRegistrations8x64(b *testing.B) { + benchmarkLookupRegistrations(b, 8, 64) +} + +func BenchmarkLookupRegistrations64x64(b *testing.B) { + benchmarkLookupRegistrations(b, 64, 64) +} + +func BenchmarkLookupRegistrations64x512(b *testing.B) { + benchmarkLookupRegistrations(b, 64, 512) +} + +func BenchmarkLookupRegistrations512x512(b *testing.B) { + benchmarkLookupRegistrations(b, 512, 512) +} + +func BenchmarkLookupRegistrations512x2048(b *testing.B) { + benchmarkLookupRegistrations(b, 512, 2048) +} + +func BenchmarkRegister8x8(b *testing.B) { + benchmarkRegister(b, 8, 8) +} + +func BenchmarkRegister8x64(b *testing.B) { + benchmarkRegister(b, 8, 64) +} + +func BenchmarkRegister64x64(b *testing.B) { + benchmarkRegister(b, 64, 64) +} + +func BenchmarkRegister64x512(b *testing.B) { + benchmarkRegister(b, 64, 512) +} + +func BenchmarkRegister512x512(b *testing.B) { + benchmarkRegister(b, 512, 512) +} + +func BenchmarkRegister512x2048(b *testing.B) { + benchmarkRegister(b, 512, 2048) +} + +func BenchmarkDoLookup8x8(b *testing.B) { + benchmarkDoLookup(b, 8, 8) +} + +func BenchmarkDoLookup8x64(b *testing.B) { + benchmarkDoLookup(b, 8, 64) +} + +func BenchmarkDoLookup64x64(b *testing.B) { + benchmarkDoLookup(b, 64, 64) +} + +func BenchmarkDoLookup64x512(b *testing.B) { + benchmarkDoLookup(b, 64, 512) +} + +func BenchmarkDoLookup512x512(b *testing.B) { + benchmarkDoLookup(b, 512, 512) +} + +func BenchmarkDoLookup512x2048(b *testing.B) { + benchmarkDoLookup(b, 512, 2048) +}