-
Notifications
You must be signed in to change notification settings - Fork 104
Conversation
- make cluster.ClusterManager an interface - rename ClusterManager struct to MemberlistClusterManager - add a SingleNodeManager that implements the clusterManager interface - when initializing the cluster, if Mode=multi use MemberlistClusterManager otherwise use SingleNodeManager
- use the kafkaMdm input plugin to consume from kafka. This consumes from explicit partitions, rather then using the high level sarama.cluster consumer groups - add metrics to track how things are running
4ee73db
to
4d9604a
Compare
cluster/manager.go
Outdated
node.Updated = time.Now() | ||
m.node = node | ||
nodeReady.Set(state == NodeReady) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does that return
have any effect?
cluster/manager.go
Outdated
// lower values == higher priority | ||
func (m *SingleNodeManager) SetPriority(prio int) { | ||
m.Lock() | ||
defer m.RUnlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is first acquiring a write lock and then releasing a read lock. I don't know what happens in this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nothing good. Will fix it up
cluster/manager.go
Outdated
node := m.node | ||
node.Priority = prio | ||
node.Updated = time.Now() | ||
m.node = node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not important, but assuming the write lock is held you might as well directly
m.node.Priority = prio
m.Updated = time.Now()
why do we want this? is this because we're seeing a bug with saramacluster? |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
that's going to end up being a complicated rebase for https://github.com/raintank/metrictank/pull/555/files, if you merge now i'll directly do that rebase now |
@@ -46,26 +47,62 @@ var ( | |||
unmarshalErrMergeRemoteState = stats.NewCounter32("cluster.decode_err.merge_remote_state") | |||
) | |||
|
|||
type ClusterManager struct { | |||
type ClusterManager interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this goes against go's "no stuttering" naming convention. can just call this Manager since it's in package cluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a pacakge var called "Manager" already.
cluster/manager.go
Outdated
Start() | ||
} | ||
|
||
type MemberlistClusterManager struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar, no stuttering. so MemberlistManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this i can change
cluster/cluster.go
Outdated
mgr.cfg.Delegate = mgr | ||
h := sha256.New() | ||
h.Write([]byte(ClusterName)) | ||
mgr.cfg.SecretKey = h.Sum(nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this stuff better go into a MemberlistManager constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that would be better.
cluster/cluster_test.go
Outdated
@@ -7,6 +7,7 @@ import ( | |||
) | |||
|
|||
func TestPeersForQuery(t *testing.T) { | |||
Mode = ModeMulti |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few lines below this it says Convey("when cluster in single mode", t, func() {
whereas here we're testing multi mode with 1 node.
maybe we should make a separate test case for an actual single mode
use kafkamdm Input plugin to consume from kafka instead of using the saramaCluster consumergroups