diff --git a/common/set/bench_test.go b/common/set/bench_test.go new file mode 100644 index 00000000..e15aa034 --- /dev/null +++ b/common/set/bench_test.go @@ -0,0 +1,83 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package set + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/google/uuid" +) + +func BenchmarkSetInsert(b *testing.B) { + + for n := 1; n < 1048576; n *= 4 { + + var keys []string + for i := 0; i < n; i++ { + keys = append(keys, uuid.New().String()) + } + + for _, set := range []string{"SliceSet", "SortedSet", "MapSet"} { + + s := newTestSet(set, n) + + b.Run(fmt.Sprintf("%s/%d", set, n), func(b *testing.B) { + + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + s.Insert(keys[rand.Intn(n)]) + } + }) + } + } +} + +func BenchmarkSetContains(b *testing.B) { + + for n := 1; n < 65536; n *= 2 { + + var keys []string + for i := 0; i < n; i++ { + keys = append(keys, uuid.New().String()) + } + + for _, set := range []string{"SliceSet", "SortedSet", "MapSet"} { + + s := newTestSet(set, n) + + for i := 0; i < n/2; i++ { + s.Insert(keys[rand.Intn(n)]) + } + + b.Run(fmt.Sprintf("%s/%d", set, n), func(b *testing.B) { + + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + s.Contains(keys[rand.Intn(n)]) + } + }) + } + } +} diff --git a/common/set/mapset.go b/common/set/mapset.go new file mode 100644 index 00000000..a834bcca --- /dev/null +++ b/common/set/mapset.go @@ -0,0 +1,124 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package set + +type mapset struct { + m map[string]struct{} +} + +// NewMapSet initializes a map-based set +func NewMapSet(cap int) Set { + + return &mapset{ + m: make(map[string]struct{}), + } +} + +// Clear clears the set +func (t *mapset) Clear() { + t.m = make(map[string]struct{}) +} + +// Empty checks if the set is empty +func (t *mapset) Empty() bool { + return len(t.m) == 0 +} + +// Count returns the size of the set +func (t *mapset) Count() int { + return len(t.m) +} + +// Insert adds a key to the set +func (t *mapset) Insert(key string) { + t.m[key] = struct{}{} +} + +// Contains checks if the key exists in the set +func (t *mapset) Contains(key string) bool { + _, ok := t.m[key] + return ok +} + +// Remove removes the key from the set +func (t *mapset) Remove(key string) { + delete(t.m, key) +} + +// Keys returns the set of keys +func (t *mapset) Keys() []string { + + m := make([]string, 0, len(t.m)) + + for k := range t.m { + m = append(m, k) + } + + return m +} + +// Equals compares the set against another set +func (t *mapset) Equals(s Set) bool { + + if s.Count() != t.Count() { + return false + } + + for _, k := range s.Keys() { + if !t.Contains(k) { + return false + } + } + + return true +} + +// Subset checks if this set is a subset of another +func (t *mapset) Subset(s Set) bool { + + if s.Count() < t.Count() { + return false + } + + for _, k := range t.Keys() { + if !s.Contains(k) { + return false + } + } + + return true +} + +// Superset checks if this set is a superset of another +func (t *mapset) Superset(s Set) bool { + + if s.Count() > t.Count() { + return false + } + + for _, k := range s.Keys() { + if !t.Contains(k) { + return false + } + } + + return true +} diff --git a/common/set/set.go b/common/set/set.go new file mode 100644 index 00000000..76f2e7e7 --- /dev/null +++ b/common/set/set.go @@ -0,0 +1,66 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package set + +// Set defines the interface for a set +type Set interface { + // Clear clears the set + Clear() + // Empty checks if the set is empty + Empty() bool + // Count returns the size of the set + Count() int + // Insert adds a key to the set + Insert(key string) + // Contains checks if the key exists in the set + Contains(key string) bool + // Remove removes the key from the set + Remove(key string) + // Keys returns the set of keys + Keys() []string + // Equals compares the set against another set + Equals(s Set) bool + // Subset checks if this set is a subset of another + Subset(s Set) bool + // Superset checks if this set is a superset of another + Superset(s Set) bool +} + +// use a sliceset if 'cap' is greater than given constant +const sliceSetMax = 16 + +// New initializes a new set, picking the appropriate underlying +// implementation based on the specified expected capacity +func New(cap int) Set { + + // pick the underlying set implementation based on the specified expected + // capacity; the thresholds are determined based on benchmarks. + switch { + case cap == 0: + fallthrough + + case cap > sliceSetMax: + return NewMapSet(cap) + + default: + return NewSliceSet(cap) + } +} diff --git a/common/set/set_test.go b/common/set/set_test.go new file mode 100644 index 00000000..14102bfd --- /dev/null +++ b/common/set/set_test.go @@ -0,0 +1,109 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package set + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func newTestSet(setType string, cap int) (s Set) { + + switch setType { + case "SliceSet": + s = NewSliceSet(0) + case "SortedSet": + s = NewSortedSet(0) + case "MapSet": + s = NewMapSet(0) + } + + return s +} + +func TestSet(t *testing.T) { + + testCases := []struct { + set0, set1 string // set type + }{ + {"SliceSet", "SliceSet"}, + {"SliceSet", "SortedSet"}, + {"SliceSet", "MapSet"}, + {"SortedSet", "SliceSet"}, + {"SortedSet", "SortedSet"}, + {"SortedSet", "MapSet"}, + {"MapSet", "SliceSet"}, + {"MapSet", "SortedSet"}, + {"MapSet", "MapSet"}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("{%v,%v}", tc.set0, tc.set1), func(t *testing.T) { + + s0 := newTestSet(tc.set0, 0) + s1 := newTestSet(tc.set1, 0) + + assert.False(t, s0.Contains("foo")) + assert.Equal(t, 0, s0.Count()) + + s0.Insert("foo") + assert.Equal(t, 1, s0.Count()) + assert.True(t, s0.Contains("foo")) + + assert.True(t, s1.Subset(s0)) + assert.False(t, s0.Subset(s1)) + assert.False(t, s1.Superset(s0)) + assert.True(t, s0.Superset(s1)) + assert.False(t, s1.Equals(s0)) + assert.False(t, s0.Equals(s1)) + + s1.Insert("foo") + assert.True(t, s1.Subset(s0)) + assert.True(t, s0.Subset(s1)) + assert.True(t, s1.Superset(s0)) + assert.True(t, s0.Superset(s1)) + assert.True(t, s1.Equals(s0)) + assert.True(t, s0.Equals(s1)) + + s1.Insert("bar") + assert.Equal(t, 2, s1.Count()) + assert.False(t, s1.Subset(s0)) + assert.True(t, s0.Subset(s1)) + assert.True(t, s1.Superset(s0)) + assert.False(t, s0.Superset(s1)) + assert.False(t, s1.Equals(s0)) + assert.False(t, s0.Equals(s1)) + + s0.Insert("bar") + assert.True(t, s1.Subset(s0)) + assert.True(t, s0.Subset(s1)) + assert.True(t, s1.Superset(s0)) + assert.True(t, s0.Superset(s1)) + assert.True(t, s1.Equals(s0)) + assert.True(t, s0.Equals(s1)) + + s0.Clear() + assert.Equal(t, 0, s0.Count()) + }) + } +} diff --git a/common/set/sliceset.go b/common/set/sliceset.go new file mode 100644 index 00000000..2ff1b0a5 --- /dev/null +++ b/common/set/sliceset.go @@ -0,0 +1,153 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package set + +type sliceset struct { + s []string +} + +// NewSliceSet initializes a slice-based set +func NewSliceSet(cap int) Set { + + return &sliceset{ + s: make([]string, 0), + } +} + +// Clear clears the set +func (t *sliceset) Clear() { + t.s = make([]string, 0) +} + +// Empty checks if the set is empty +func (t *sliceset) Empty() bool { + return len(t.s) == 0 +} + +// Count returns the size of the set +func (t *sliceset) Count() int { + return len(t.s) +} + +// Insert adds a key to the set +func (t *sliceset) Insert(key string) { + + for _, k := range t.s { + if k == key { + return + } + } + + t.s = append(t.s, key) +} + +// Contains checks if the key exists in the set +func (t *sliceset) Contains(key string) bool { + + for _, k := range t.s { + if k == key { + return true + } + } + + return false +} + +// Remove removes the key from the set +func (t *sliceset) Remove(key string) { + + for i, k := range t.s { + + if k == key { + + last := len(t.s) - 1 + + if last >= 0 { + t.s[i] = t.s[last] + } + + t.s = t.s[:last] + return + } + } + + return +} + +// Keys returns the set of keys +func (t *sliceset) Keys() []string { + + m := make([]string, 0, len(t.s)) + + for _, k := range t.s { + m = append(m, k) + } + + return m +} + +// Equals compares the set against another set +func (t *sliceset) Equals(s Set) bool { + + if s.Count() != t.Count() { + return false + } + + for _, k := range s.Keys() { + if !t.Contains(k) { + return false + } + } + + return true +} + +// Subset checks if this set is a subset of another +func (t *sliceset) Subset(s Set) bool { + + if s.Count() < t.Count() { + return false + } + + for _, k := range t.Keys() { + if !s.Contains(k) { + return false + } + } + + return true +} + +// Superset checks if this set is a superset of another +func (t *sliceset) Superset(s Set) bool { + + if s.Count() > t.Count() { + return false + } + + for _, k := range s.Keys() { + if !t.Contains(k) { + return false + } + } + + return true +} diff --git a/common/set/sortedset.go b/common/set/sortedset.go new file mode 100644 index 00000000..02d1f238 --- /dev/null +++ b/common/set/sortedset.go @@ -0,0 +1,147 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package set + +import ( + "sort" +) + +type sortedset struct { + s []string +} + +// NewSortedSet initializes a sorted-slice based set +func NewSortedSet(cap int) Set { + + return &sortedset{ + s: make([]string, 0), + } +} + +// Clear clears the set +func (t *sortedset) Clear() { + t.s = make([]string, 0) +} + +// Empty checks if the set is empty +func (t *sortedset) Empty() bool { + return len(t.s) == 0 +} + +// Count returns the size of the set +func (t *sortedset) Count() int { + return len(t.s) +} + +// Insert adds a key to the set +func (t *sortedset) Insert(key string) { + + i := sort.SearchStrings(t.s, key) + + if i < len(t.s) && t.s[i] == key { + return // already contains key + } + + // make space and insert at index 'i' + t.s = append(t.s, "") + copy(t.s[i+1:], t.s[i:]) + t.s[i] = key +} + +// Contains checks if the key exists in the set +func (t *sortedset) Contains(key string) bool { + + i := sort.SearchStrings(t.s, key) + return i < len(t.s) && t.s[i] == key +} + +// Remove removes the key from the set +func (t *sortedset) Remove(key string) { + + i := sort.SearchStrings(t.s, key) + + if i == len(t.s) && t.s[i] != key { + return // does not contain key + } + + copy(t.s[i:], t.s[i+1:]) + t.s = t.s[:len(t.s)-1] +} + +// Keys returns the set of keys +func (t *sortedset) Keys() []string { + + m := make([]string, 0, len(t.s)) + + for _, k := range t.s { + m = append(m, k) + } + + return m +} + +// Equals compares the set against another set +func (t *sortedset) Equals(s Set) bool { + + if s.Count() != t.Count() { + return false + } + + for _, k := range s.Keys() { + if !t.Contains(k) { + return false + } + } + + return true +} + +// Subset checks if this set is a subset of another +func (t *sortedset) Subset(s Set) bool { + + if s.Count() < t.Count() { + return false + } + + for _, k := range t.Keys() { + if !s.Contains(k) { + return false + } + } + + return true +} + +// Superset checks if this set is a superset of another +func (t *sortedset) Superset(s Set) bool { + + if s.Count() > t.Count() { + return false + } + + for _, k := range s.Keys() { + if !t.Contains(k) { + return false + } + } + + return true +} diff --git a/glide.lock b/glide.lock index 6a431ca3..90e9abee 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 95b094ac76b3329e6ad38bb6314377db13aa5d0a7f29f4a74b54f00502e17ad4 -updated: 2017-04-17T10:59:50.30170527-07:00 +hash: 870ba36290434c4a2426ce5a9925be2be2ea38616ae8ed8f20979e936ae2344f +updated: 2017-05-05T14:07:34.743467256-07:00 imports: - name: github.com/apache/thrift version: 2d65c2365f19f637bc732222e71d78727bf0b709 @@ -76,8 +76,10 @@ imports: - internal/streams - name: github.com/golang/snappy version: 553a641470496b2327abcac10b36396bd98e45c9 +- name: github.com/google/uuid + version: 064e2069ce9c359c118179501254f67d7d37ba24 - name: github.com/gorilla/websocket - version: a91eba7f97777409bc2c443f5534d41dd20c5720 + version: 3ab3a8b8831546bd18fd182c20687ca853b2bb13 - name: github.com/hailocab/go-hostpool version: e80d13ce29ede4452c43dea11e79b9bc8a15b478 - name: github.com/jmespath/go-jmespath @@ -168,7 +170,7 @@ imports: - trand - typed - name: golang.org/x/net - version: 5602c733f70afc6dcec6766be0d5034d4c4f14de + version: d212a1ef2de2f5d441c327b8f26cf3ea3ea9f265 subpackages: - context - name: golang.org/x/sys diff --git a/glide.yaml b/glide.yaml index 009f5df9..70d1e6cd 100644 --- a/glide.yaml +++ b/glide.yaml @@ -23,7 +23,6 @@ import: - mock - require - suite - - package: github.com/tecbot/gorocksdb version: 17991d3138a879b166adebf86f7c84da3c1517a7 - package: github.com/cockroachdb/c-jemalloc @@ -72,3 +71,5 @@ import: - package: github.com/Shopify/sarama version: ^1.11.0 repo: http://github.com/Shopify/sarama +- package: github.com/google/uuid + version: ^0.2.0 diff --git a/services/outputhost/ackmanager.go b/services/outputhost/ackmanager.go index d4c3e27a..f0aef637 100644 --- a/services/outputhost/ackmanager.go +++ b/services/outputhost/ackmanager.go @@ -123,7 +123,14 @@ func (ackMgr *ackManager) getNextAckID(address int64, sequence common.SequenceNu skippedMessages := sequence - expectedReadLevel if skippedMessages < 0 { - ackMgr.logger.Error(`negative discontinuity detected (rollback)`) + ackMgr.logger.WithFields(bark.Fields{ + `address`: address, + `sequence`: sequence, + `readLevel`: ackMgr.readLevel, + `levelOffset`: ackMgr.levelOffset, + `expectedReadLevel`: expectedReadLevel, + `skippedMessages`: skippedMessages, + }).Error(`negative discontinuity detected (rollback)`) // Don't update gauge, since negative numbers aren't supported for M3 gauges } else { // update gauge here to say we skipped messages (potentially due to retention?) @@ -185,20 +192,22 @@ func (ackMgr *ackManager) start() { go ackMgr.manageAckLevel() } -func (ackMgr *ackManager) getCurrentAckLevelOffset() (addr int64) { +// getCurrentReadLevel returns the current read-level address and seqnum. this is called +// by extcache when it connects to a new replica, when one stream is disconnected. +func (ackMgr *ackManager) getCurrentReadLevel() (addr int64, seqNo common.SequenceNumber) { + ackMgr.lk.RLock() - if addrs, ok := ackMgr.addrs[ackMgr.ackLevel]; ok { - addr = int64(addrs.addr) + defer ackMgr.lk.RUnlock() + + // the 'readLevel' may not exist in the 'addrs' map if this instance + // of ackMgr has not seen a message yet (ie, getNextAckID has not been + // called), in which case we would return '0' addr. + if msg, ok := ackMgr.addrs[ackMgr.readLevel]; ok { + addr = int64(msg.addr) } - ackMgr.lk.RUnlock() - return -} + seqNo = ackMgr.levelOffset + ackMgr.readLevel -func (ackMgr *ackManager) getCurrentAckLevelSeqNo() (seqNo common.SequenceNumber) { - ackMgr.lk.RLock() - seqNo = ackMgr.levelOffset + ackMgr.ackLevel - ackMgr.lk.RUnlock() return } diff --git a/services/outputhost/extcache.go b/services/outputhost/extcache.go index d4624968..90040ba1 100644 --- a/services/outputhost/extcache.go +++ b/services/outputhost/extcache.go @@ -429,15 +429,10 @@ func (extCache *extentCache) manageExtentCache() { err = nil } else { // this means a replica stream was closed. try another replica - extCache.logger.Info(`trying another replica`) - // first make sure the ackMgr updates its current ack level - extCache.ackMgr.updateAckLevel() - // TODO: Fix small race between the offset and seqNo calls + startAddr, startSequence := extCache.ackMgr.getCurrentReadLevel() + extCache.logger.WithFields(bark.Fields{`addr`: startAddr, `seqnum`: startSequence}).Info(`extcache: trying another replica`) extCache.connection, extCache.pickedIndex, err = - extCache.loadReplicaStream( - extCache.ackMgr.getCurrentAckLevelOffset(), - extCache.ackMgr.getCurrentAckLevelSeqNo(), - (extCache.pickedIndex+1)%len(extCache.storeUUIDs)) + extCache.loadReplicaStream(startAddr, startSequence, (extCache.pickedIndex+1)%len(extCache.storeUUIDs)) } extCache.cacheMutex.Unlock() if err != nil { diff --git a/services/outputhost/outputhost_test.go b/services/outputhost/outputhost_test.go index ea6934e5..30b2f413 100644 --- a/services/outputhost/outputhost_test.go +++ b/services/outputhost/outputhost_test.go @@ -34,6 +34,7 @@ import ( "github.com/uber/cherami-server/common" "github.com/uber/cherami-server/common/configure" dconfig "github.com/uber/cherami-server/common/dconfigclient" + "github.com/uber/cherami-server/common/set" mockcommon "github.com/uber/cherami-server/test/mocks/common" mockcontroller "github.com/uber/cherami-server/test/mocks/controllerhost" mockmeta "github.com/uber/cherami-server/test/mocks/metadata" @@ -1001,3 +1002,142 @@ func (s *OutputHostSuite) TestOutputAckMgrReset() { // 10. make sure the readlevels are not the same assert.NotEqual(s.T(), readLevel, newReadLevel, "read levels should not be the same") } + +func (s *OutputHostSuite) TestOutputHostReplicaRollover() { + + count := 50 + + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) + httpRequest := utilGetHTTPRequestWithPath("foo") + + destUUID := uuid.New() + destDesc := shared.NewDestinationDescription() + destDesc.Path = common.StringPtr("/foo/bar") + destDesc.DestinationUUID = common.StringPtr(destUUID) + destDesc.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus_ENABLED) + s.mockMeta.On("ReadDestination", mock.Anything, mock.Anything).Return(destDesc, nil).Once() + s.mockMeta.On("ReadExtentStats", mock.Anything, mock.Anything).Return(nil, fmt.Errorf(`foo`)) + + cgDesc := shared.NewConsumerGroupDescription() + cgDesc.ConsumerGroupUUID = common.StringPtr(uuid.New()) + cgDesc.DestinationUUID = common.StringPtr(destUUID) + s.mockMeta.On("ReadConsumerGroup", mock.Anything, mock.Anything).Return(cgDesc, nil).Twice() + + cgExt := shared.NewConsumerGroupExtent() + cgExt.ExtentUUID = common.StringPtr(uuid.New()) + cgExt.StoreUUIDs = []string{"mock"} + + cgRes := &shared.ReadConsumerGroupExtentsResult_{} + cgRes.Extents = append(cgRes.Extents, cgExt) + s.mockMeta.On("ReadConsumerGroupExtents", mock.Anything, mock.Anything).Return(cgRes, nil).Once() + s.mockRead.On("Write", mock.Anything).Return(nil) + + writeDoneCh := make(chan struct{}) + var msgsRecv = set.New(count) + + s.mockCons.On("Write", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + + ohc := args.Get(0).(*cherami.OutputHostCommand) + + if ohc.GetType() == cherami.OutputHostCommandType_MESSAGE { + + msg := ohc.GetMessage() + id := msg.GetPayload().GetID() + + // ensure we don't see duplicates + s.False(msgsRecv.Contains(id)) + + if msgsRecv.Insert(id); msgsRecv.Count() == count { + close(writeDoneCh) + } + } + + }).Times(count) + + cFlow := cherami.NewControlFlow() + cFlow.Credits = common.Int32Ptr(int32(count)) + + connOpenedCh := make(chan struct{}) + s.mockCons.On("Read").Return(cFlow, nil).Once().Run(func(args mock.Arguments) { close(connOpenedCh) }) + + rmc := store.NewReadMessageContent() + rmc.Type = store.ReadMessageContentTypePtr(store.ReadMessageContentType_MESSAGE) + + var seqnum int64 + + s.mockRead.On("Read").Return(rmc, nil).Run(func(args mock.Arguments) { + seqnum++ + aMsg := store.NewAppendMessage() + aMsg.SequenceNumber = common.Int64Ptr(seqnum) + pMsg := cherami.NewPutMessage() + pMsg.ID = common.StringPtr(strconv.Itoa(int(seqnum))) + pMsg.Data = []byte(fmt.Sprintf("seqnum=%d", seqnum)) + aMsg.Payload = pMsg + rMsg := store.NewReadMessage() + rMsg.Message = aMsg + rMsg.Address = common.Int64Ptr(1234000000 + seqnum) + rmc.Message = rMsg + }).Times(20) + + // simulate error from a replica, that should cause outputhost to resume + // by re-connecting to the replica + s.mockRead.On("Read").Return(0, errors.New("store error")).Once() + + s.mockRead.On("Read").Return(rmc, nil).Run(func(args mock.Arguments) { + seqnum++ + aMsg := store.NewAppendMessage() + aMsg.SequenceNumber = common.Int64Ptr(seqnum) + pMsg := cherami.NewPutMessage() + pMsg.ID = common.StringPtr(strconv.Itoa(int(seqnum))) + pMsg.Data = []byte(fmt.Sprintf("seqnum=%d", seqnum)) + aMsg.Payload = pMsg + rMsg := store.NewReadMessage() + rMsg.Message = aMsg + rMsg.Address = common.Int64Ptr(1234000000 + seqnum) + rmc.Message = rMsg + }).Times(count - 20) + + rmcSeal := store.NewReadMessageContent() + rmcSeal.Type = store.ReadMessageContentTypePtr(store.ReadMessageContentType_SEALED) + rmcSeal.Sealed = store.NewExtentSealedError() + s.mockRead.On("Read").Return(rmcSeal, nil).Once() + + streamDoneCh := make(chan struct{}) + go func() { + outputHost.OpenConsumerStreamHandler(s.mockHTTPResponse, httpRequest) + close(streamDoneCh) + }() + + // close the read stream + creditUnblockCh := make(chan struct{}) + s.mockRead.On("Read").Return(nil, io.EOF) + s.mockCons.On("Read").Return(nil, io.EOF).Run(func(args mock.Arguments) { <-writeDoneCh; <-creditUnblockCh }) + + <-connOpenedCh // wait for the consConnection to open + + // look up cgcache and the underlying client connnection + outputHost.cgMutex.RLock() + cgCache, ok := outputHost.cgCache[cgDesc.GetConsumerGroupUUID()] + outputHost.cgMutex.RUnlock() + s.True(ok, "cannot find cgcache entry") + + var nConns = 0 + var conn *consConnection + cgCache.extMutex.RLock() + for _, conn = range cgCache.connections { + break + } + nConns = len(cgCache.connections) + cgCache.extMutex.RUnlock() + s.Equal(1, nConns, "wrong number of consumer connections") + s.NotNil(conn, "failed to find consConnection within cgcache") + + creditUnblockCh <- struct{}{} // now unblock the readCreditsPump on the consconnection + <-streamDoneCh + + s.mockHTTPResponse.AssertNotCalled(s.T(), "WriteHeader", mock.Anything) + s.Equal(int64(count), conn.sentMsgs, "wrong sentMsgs count") + s.Equal(int64(0), conn.reSentMsgs, "wrong reSentMsgs count") + s.Equal(conn.sentMsgs, conn.sentToMsgCache, "sentMsgs != sentToMsgCache") + outputHost.Shutdown() +}