Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Peer discovery #459

Merged
merged 10 commits into from
Jan 17, 2017
Merged

Peer discovery #459

merged 10 commits into from
Jan 17, 2017

Conversation

woodsaj
Copy link
Member

@woodsaj woodsaj commented Jan 10, 2017

closes #436

Uses hasicorp/memberlist for handling

Uses hasicorp/memberlist, which is a library that manages cluster membership and member failure detection using a gossip based protocol.

To enable use of memberlist a "ClusterManager" struct has been created. This struct handles coordination of changes to the state of every node.

Nodes are now passed throughout the codebase by value (copies) which eliminates
the need for complex locking. To prevent races when reading/updating the localNode
properties, all changes are co-ordinated through the ClusterManager which provides
the required locking.

@woodsaj woodsaj force-pushed the peerDiscovery branch 2 times, most recently from a69c90c to e55859b Compare January 10, 2017 16:56
Copy link
Contributor

@Dieterbe Dieterbe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still working on going through this, but wanted to ask: do you have any tips on how to play with this and verify things? should we use docker/docker-cluster ? and then just start more metrictanks? (using which commands?) and then just follow the info-level logs ?

BTW in metrictank.go :

	// stop polling peer nodes
	cluster.Stop()

this will have to be updated to "announce intent to leave cluster" or something.

t.Errorf("testcase %d, request %d:\nexpected: %v\n got: %v", i, r, exp.DebugString(), out[r].DebugString())
}
}
}
}
}

func compareReqEqual(a, b models.Req) bool {
Copy link
Contributor

@Dieterbe Dieterbe Jan 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we make this Req.Equal(b models.Req), similar to e.g. https://golang.org/pkg/time/#Time.Equal
btw, is the goal to compare al attributes except for node ? if so, that should be explained in the function doc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am hesitant to add a Equal method to Req as it is only needed for this one test.
The goal is to just compare to compare alignCase.reqs with alignCase.outReqs. This was done with a ==, but now that Req.Node is no longer a pointer you cant do that. If these tests used github.com/smartystreets/goconvey/convey we would just use ShouldResemble to do a deep comparision.

Convey("when cluster in single mode", t, func() {
selected := PeersForQuery()
So(selected, ShouldHaveLength, 1)
So(selected[0], ShouldEqual, ThisNode)
So(selected[0], ShouldResemble, Manager.ThisNode())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

proposal: any time we use ShouldResemble anywhere, we should comment about why should resemble and not equal, to make it clear to the reader. because any time i encounter this in any code base, I always wonder that same question.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShouldResemble is just a deep equal as you cant do equality checks on structs that contain pointers (or maps or slices). You can do equality on references though, as you are just comparing the memory addresses.

}
addr, err := net.ResolveTCPAddr("tcp", clusterBindAddr)
if err != nil {
log.Fatal(4, "cluster-bind-addres is not a valid TCP address.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's return the actual error. "not valid" can be misleading and ambiguous.

primary: false,
primaryChange: time.Now(),
stateChange: time.Now(),
func Init(name, version string, started time.Time, scheme string, port int) {
Copy link
Contributor

@Dieterbe Dieterbe Jan 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

terms like port, scheme, ListenPort and ListenScheme are getting ambiguous.

it looks like MT will listen on the network, to satisfy two subsystems: its api (http), and also cluster/gossip (tcp and udp).
I propose we use terms like apiPort, apiScheme (instead of listen*) in the code next to clusterPort and clusterHost. so that when you look any any scheme/port/addr variable, it's more clear which one it is.

list *memberlist.Memberlist
}

func (c *ClusterManager) SetList(list *memberlist.Memberlist) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only called from within cluster package -> lowercase.

c.RLock()
list := make([]Node, len(c.Peers))
i := 0
for _, n := range c.Peers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use copy() instead of own loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad. c.Peers is a map..

peer := Node{}
err := json.Unmarshal(node.Meta, &peer)
if err != nil {
panic(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we panic? seems like 1 MT instance returning faulty data can take down an entire cluster.
not sure how to properly handle though, ideally I think we simply wouldn't add it to our peerslist, but then our peerslist is out sync with our gossip state. so perhaps we should add it, but mark it as faulty?

// check if we are already going to use one of the
// available nodes and re-use it
reusePeer := false
for _, n := range nodes {
if _, ok := selectedPeers[n]; ok {
if _, ok := selectedPeers[n.Name]; ok {
selected = n
reusePeer = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this bit be shortened a little?
If you give the outer for a label and just continue the outer for here, then you don't need the reusePeer flag and the two ifs on line 134 and 139

peer := Node{}
err := json.Unmarshal(node.Meta, &peer)
if err != nil {
panic(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we panic? seems like 1 MT instance sending faulty data can take down an entire cluster.
not sure how to properly handle though, ideally I think we simply wouldn't update our info, but we should probably also mark it as faulty.

}

// SetPrimary sets the primary status.
// Note: since we set the primary metric here, this should only be called on ThisNode !
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.node is always ThisNode right, so this comment can be removed. as it's now impossible to call SetPrimary on a node that is not thisNode.

list[i] = n
i++
}
c.RUnlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general i think it's a good idea to use defer even if in this case it makes effectively no difference. It might make a difference if another person comes in to edit this code and adds a return somewhere in the middle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We avoid using defer due to the performance penalty. golang/go#14939

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wow, i did not know that, that looks like some serious penalty according to that post.
at the end he says that even in 1.8 that's only improved but not completely solved yet :(

# http/s addresses of other nodes, comma separated. use this if you shard your data and want to query other instances
# the TCP/UDP address to listen on for the gossip protocol.
bind-addr = 0.0.0.0:7946
# TCP addresses of other nodes, comma separated. use this if you shard your data and want to query other instances
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like you can specifyhost:port peers, but also host peers which will be polled on the same port as the one we're binding on. let's clarify that

// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
func (c *ClusterManager) NodeMeta(limit int) []byte {
c.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would a read lock be sufficient here?

c.Peers[name] = meta
}
}
c.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, i think it would be more appropriate to put defer c.Unlock() right after c.Lock() because it's very easy for someone to add a return inside one of the enclosed conditions and missing the lock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We avoid using defer due to the performance penalty. golang/go#14939

@woodsaj
Copy link
Member Author

woodsaj commented Jan 11, 2017

To test you can just use docker/docker-cluster.
This will bring up metrictank0 first (which is already configured with no peers), then the other 3 MT instances. No need to add additional MT instances, just restart/stop metrictank1-3 (dont restart metrictank0, as it wont get re-added to the cluster due to it not having any peers configured)

@woodsaj woodsaj force-pushed the peerDiscovery branch 3 times, most recently from 04b54c0 to 70fe357 Compare January 11, 2017 18:40
@Dieterbe
Copy link
Contributor

dont restart metrictank0, as it wont get re-added to the cluster due to it not having any peers configured

shouldn't the other instances be pinging it and rediscover it, so that mt0 also discovers the other ones?

one thing I'ld also like to try is set MT_CLUSTER_PEERS to only just one other instance, which should be enough to get it into the cluster.

For now, I'm running into some issues though. seeing messages like [WARN] memberlist: Got ping for unexpected node 'metrictank3' from=127.0.0.1:7946, [ERR] memberlist: Failed TCP fallback ping: EOF, [DEBUG] memberlist: Failed UDP ping: metrictank2 (timeout reached) and a bunch like Node metrictank1 has left the cluster
does this look familiar?
I just run ./docker/launch.sh docker-cluster with my patch above and also docker-compose logs -f &> logs.txt in the docker-cluster dir and then egrep 'metrictank.*_1.*CLU|memberlist' logs.txt to see all cluster and memberlist messages : https://gist.github.com/Dieterbe/4fe3c0cbad5d9937828a4af28f0e7e07

@woodsaj
Copy link
Member Author

woodsaj commented Jan 12, 2017

The issue you are seeing is due to: hashicorp/memberlist#102

we can work around this by explicitly setting the bind-addr. in the docker-compose yaml we need to add the
MT_CLUSTER_BIND_ADDR= < instanceName >:7946
eg, for metrictank0 instance

MT_CLUSTER_BIND_ADDR=metrictank0:7946

Ill push a commit for this now.

@woodsaj
Copy link
Member Author

woodsaj commented Jan 12, 2017

shouldn't the other instances be pinging it and rediscover it, so that mt0 also discovers the other ones?

once a node has left the cluster it is not retried. It is expected that when the node is online again it will tell 1 or more peers of its presence.
But If you configure mt0 with a list of peers, then it will assume that the cluster is online and try and connect to the peers. If it cant connect to any peers, then it wont start up.

@Dieterbe
Copy link
Contributor

Dieterbe commented Jan 12, 2017

I've been playing with it, seems to work pretty well. nodes can also join the cluster by peering with only 1 instance and having it gossip through which is neat.
I wonder how we'll end up troubleshooting when peers lose each other, the cluster is incomplete/unhealthy, stuff like that.

the long awaited log refactor with that will bring properly formatted log messages that can easily be collected and correlated, will surely help but perhaps some stats would be nice too. I'm thinking:

  • gauge32 of number of nodes in peerlist, perhaps seggregated by version and ready state
  • gauge32 of number of partitions handled by this node
  • gauge32 of number of unique partitions seen accross all other peers.
  • counter32 of events: join, leave, update
  • bool for ready state (similar to primary state)
  • counter32 for json unmarshal errors (seggregated by notify type)

in particular, probably not worth tracking (?):

  • how many peers do we know that are primaries
  • actual remote peer instance names (in metrics keys), that would make the keyspace explode a bit too much

I noticed that we also don't have stats yet for errors upon node.Post and node.Get failing or timeouting (but that would be for another PR)
However, this does make one think.. if node.Post/node.Get is seeing lots of errors or timeouts for a peer..

  1. this is independent from the gossip cluster state, which may have said node as healthy
  2. does not affect the node readystate or PeersForQuery output

Solving 2 would make 1 not an issue I think, I also consider 2 out of scope for our first cluster implementation.

woodsaj added 3 commits January 13, 2017 19:58
memberlist is a library that manages cluster membership and
member failure detection using a gossip based protocol.

To enable use of memberlist a "ClusterManager" struct has been created.
This struct handles co-ordination of changes to the state of every node.

Nodes are now passed throughout the codebase by value (copies) which eliminates
the need for complex locking.  To prevent races when reading/updating the localNode
properties, all changes are co-ordinated through the ClusterManager which provides
the required locking.
woodsaj and others added 3 commits January 13, 2017 19:58
- CLU <submodule> prefix
- don't use error messages as strings to be interpreted as format
  strings. that can mess up things.
- no need for \n at the end
@woodsaj
Copy link
Member Author

woodsaj commented Jan 16, 2017

I added the stats, see https://github.com/raintank/metrictank/blob/b0d033da11ae4053c54fa25672eef3e60d11287c/cluster/manager.go#L14-L35 for the list.

I agree that we should have nodes take themselves out or service (or be taken out be peers) if they cant handle queries. But that is for a later PR.

@Dieterbe Can we please get this merged now.

@woodsaj woodsaj dismissed Dieterbe’s stale review January 16, 2017 13:46

requested changes implemented.

"github.com/raintank/worldping-api/pkg/log"
)

var (
// total number of nodes this instance thinks are in the cluster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you use the proper format then you can use metrics2docs to generate docs/metrics.md
see https://github.com/Dieterbe/metrics2docs#doc-format
(note the tool may not work perfectly and sometimes tries to make other changes to the file as well. but you can just use git checkout -p docs/metrics.md to undo that, or git add -p docs/metrics.md to only commit the appropriate changes)

* categorize things in a hierarchy, it was too messy
* instead of independent counts of primary nodes and ready nodes, put every node in
  a distinct category.  this makes it so you can identify states unambiguously.
  it also means we don't need to track total number of nodes separately.
  just sum them
* fix metrics2docs format
* make variable names tab-completable (e.g. also a hierarchy in the naming)
* add json decode metrics
@Dieterbe
Copy link
Contributor

I think in general it's a good approach to load/parse/validate all configuration first (and fail the process if appropriate), before trying to talk to other external systems, otherwise unhealthy systems can pollute the network and other instance's logs which is messy.
in the past, we would only start the cluster after validating configs.
now we start the cluster much earlier. which means the node will try to join the cluster and potentially fail immediately after which I think is messy.

also you moved the starting of the API server to before the loading of the index. this means for a short period of time, the MT instance can provide incorrect and confusing responses to http requests. can't we keep the api server where it was?

type ClusterManager struct {
sync.RWMutex
Peers map[string]Node
node Node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attribute name node is too vague. which node? suggestion: self and rename ThisNode() to Self()


type ClusterManager struct {
sync.RWMutex
Peers map[string]Node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment explaining whether node / self /this is also part of peers. also lowercase

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thisNode/self is part of Peers, so i will rename this to "members"

return
}
peer.RemoteAddr = node.Addr.String()
if peer.Name == c.node.Name {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you accidentally start an instance with the same name, it'll be marked as local despite having a remote remoteaddr. is that a good idea? (same for notifyUpdate)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assigning unique names to each node is essential for running a cluster. Ill make sure that is clear in the config/docs


func (c *ClusterManager) BroadcastUpdate() {
if c.list != nil {
go c.list.UpdateNode(time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this fails or timeouts, will the other nodes ever be aware of updates to our instance? how? (worth commenting there)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only fail if connectivity to all nodes is lost. It is unlikely that will happen without this node being dropped from the cluster. When it re-joins it will do a full re-sync.

Additionally, full state re-syncs are performed periodically (every 30seconds by default). Any changes not propagated via a NotifyUpdate message will be picked up via the complete state syncs, handled by the MergeRemoteState method.

@Dieterbe
Copy link
Contributor

have not been able to review as thoroughly as i would like, and probably won't get to it tomorrow.
my comments above aside, it looks pretty good. if you're happy with it you can merge it.

@woodsaj
Copy link
Member Author

woodsaj commented Jan 17, 2017

I think in general it's a good approach to load/parse/validate all configuration first (and fail the process if appropriate), before trying to talk to other external systems, otherwise unhealthy systems can pollute the network and other instance's logs which is messy.
in the past, we would only start the cluster after validating configs.
now we start the cluster much earlier. which means the node will try to join the cluster and potentially fail immediately after which I think is messy.

The current ordering is necessary as other components (input plugins) need to tell the ClusterManager information about the node (the partitions being handled) which is done in the ConfigSetup phase. Ill see if i can refactor this to delay starting the clustering, but if not it will have to stay like this until we get around to cleaning up how we handle configuration and initialization.

also you moved the starting of the API server to before the loading of the index. this means for a short period of time, the MT instance can provide incorrect and confusing responses to http requests. can't we keep the api server where it was?

This was deliberate so that we can monitor the aliveness of the node while it starts up. Loading the index, and in future replaying metricPersist messages can take a while. For k8s specifically we need to query the API to see if the node is alive. Though the node will immediately report that it is alive, it will also report that it is not ready for handling queries yet (GET /, will still return a 503 error)

- the list of nodes the ClusterManager tracks includes thisNode,
so using the term "members" is more appropriate.
@woodsaj woodsaj merged commit 6cbbc36 into master Jan 17, 2017
@woodsaj woodsaj deleted the peerDiscovery branch January 17, 2017 08:49
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support dynamically adding peers
3 participants