Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Clustering #400

Merged
merged 15 commits into from
Dec 13, 2016
Merged

Clustering #400

merged 15 commits into from
Dec 13, 2016

Conversation

woodsaj
Copy link
Member

@woodsaj woodsaj commented Nov 17, 2016

  • keep track of nodes in the cluster
  • assign "partitions" to each node
  • include partition info in the metric index for each metric
  • update MetricIndex plugins to be partition aware
    • this means the data stored in the backend stores will change so we will need to provide some migration tools.
  • make carbon and kafkamdm input plugins partition aware
  • when calling graphite's /render api fan out the request to
    one node for each partition
  • when calling graphite's /metrics/find api fan out the requests
    to one node for each partition
  • when calling graphites /metrics/index.jon api fan out the requests
    to one node for each partition
  • add internal "cluster" api's to support the clustered graphite api
    requests

Copy link
Contributor

@Dieterbe Dieterbe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a lot more reviewing to do, i only understand a part of the code so far.
at first glance this looks like a solid piece of code.
here's some feedback to get started.

Also any thoughts on adding a idx table in cassandra per partition? that way we don't have to specify the partition for every single metricdef, and we don't need an index either.

"path": "github.com/tinylib/msgp/msgp",
"revision": "0cea1fa86e8403be1284013014f87ab942056de8",
"revisionTime": "2015-10-23T22:38:53Z"
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? (don't just tell me, put it in the commit message)
it seems like we still import github.com/tinylib/msgp/msgp

Timeout: time.Second,
}

func NodeStateFromString(s string) NodeState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced by stringer output?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad this is string to state

primaryNode = flag.Bool("primary-node", false, "the primary node writes data to cassandra. There should only be 1 primary node per cluster of nodes.")
primaryNode = flag.Bool("primary-node", false, "the primary node writes data to cassandra. There should only be 1 primary node per cluster of nodes.")
peersStr = flag.String("peers", "", "http/s addresses of other nodes, comma separated. use this if you shard your data and want to query other instances")
probeIntervalStr = flag.String("probe-interval", "2s", "Interval to probe peer nodes")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was gonna say https://golang.org/pkg/flag/#Duration but then i realized we lose access to a validation error, which is so weird. so flag.String it is..

primaryNode = flag.Bool("primary-node", false, "the primary node writes data to cassandra. There should only be 1 primary node per cluster of nodes.")
peersStr = flag.String("peers", "", "http/s addresses of other nodes, comma separated. use this if you shard your data and want to query other instances")
probeIntervalStr = flag.String("probe-interval", "2s", "Interval to probe peer nodes")
mode = flag.String("mode", "single", "Operating mode of cluster. (single|multi)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: can this be derived from len(peers) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "mode" flag is only used when deciding on whether other nodes need to be used to handle graphite queries. I wasnt sure if there was a use case where you would want to know about other nodes in the cluster but not co-ordinate queries with them. The example i had in mind is if we later support automatic primary failover for a cluster of nodes that all handle all partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair

if !cluster.ValidMode(cluster.ModeType(*mode)) {
log.Fatal(4, "invalid cluster operating mode")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it make more sense to have the cluster package declare the flags, validate them, parse the peer list etc ?
I like the approach we're taking more and more where main hooks into package callbacks to take care of that stuff.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that sounds reasonable.


// return the list of peers to broadcast requests to
// Only 1 peer per partition is returned
func PeersForQuery() []*Node {
Copy link
Contributor

@Dieterbe Dieterbe Nov 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth pointing out that in half of the code (peers, AddPeer) we consider peers as "the other nodes" (not ThisNode).
but anything that uses PeersForQuery will get a peers list that includes ThisNode. not a bad thing, i can see how it makes sense, just worth documenting as it was a surprise to me

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had PeersForQuery exclude ThisNode, but i then added it back in to allow future features that provide better scheduling.

For example in future, we way want to extend the "state" of a node to differentiate between ready to handle queries and ready to serve data (ie due to just having started up and not having data in memory). We could then preference other nodes that have been online longer over a node that has just started up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, but returning all nodes that should be used for handling the query we can apply the "mode" enforcement easily. If "mode" is set to single, we just return ThisNode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it makes sense, i would just document it for clarity

case <-ticker.C:
mu.Lock()
for _, peer := range peers {
go peer.Probe()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the peer is slow, the Probe calls will just pile up, each waiting to acquire the lock and probe the node right after it just finished probing.
this can also happen if the ticker ticks right after AddPeer() which also calls go peer.Probe()

I think we need something like https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HTTP call is made without the lock being held. So the easiest solution here is to just track if a probe is currently underway and if it is, immediately return.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm this link seems off. also not seeing any change wrt Probe() in the new version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh, i clobbered the change when i reverted a commit.

https://github.com/raintank/metrictank/blob/clustering/cluster/node.go#L133-L139

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice solution. might want to set n.probing = false when done ;)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, added it.

// we only need to query 1 node for each partition.
// we are just using the first node for each partition, which will be
// ThisNode if ThisNode owns the partition. Later we can add more
// complex scheduling logic to distribute queries across the cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems a tad too basic, because:

  1. we consistently select the first one, always
  2. a peer may be selected multiple times because it hosts multiple partitions

this seems like we can get very easily into situations where traffic is severely imbalanced, with some nodes receiving a multiple of what is normal and others receiving 0 requests. or am i missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is simple, but gets the job done. It is by no means the best solution, but i think it is good enough for now.

Assume we have 4 partitions and 4 nodes

nodeA nodeB nodeC nodeD
0,1 0, 1 2, 3 2, 3

if a query is sent to nodeA, it will select nodeA for partitions 0,1 and either nodeC or nodeD for partitions 2,3. So every query will only map to 2 nodes, the local node and 1 other.

The only problem that i see is that both nodeA and nodeB could consistently send queries to nodeC and never nodeD.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, i have refactored this to ensure balanced distribution between nodes. Also included unitTest to make sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to shuffle the partitions across nodes though?
To follow your example with parts=4 and nodes=4, I would rather set it up like so:

nodeA nodeB nodeC nodeD
0,1 0, 2 1,3 2, 3

this way, when a node crashes, the extra workload gets split up over 2 nodes, as opposed to moving everything over to 1 node which would double its workload and seems more likely to crash.
the more partitions we have, the more we can evenly distribute it too. if we have 6 partitions and 4 nodes we can make it so that on a node failure, the excess load gets split 33% each to the other nodes.

BTW I like the new approach!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have to review the changes in the api package, but one important concern that comes up with my partition assignment approach (which is not an issue with yours) is that when we query multiple nodes and there is some overlap in between, they might do duplicate work, unless we specify in the request which partitions we want to use the node for.

for example: let's say ThisNode is A, and we query data for all 4 partitions.
we'll pick A for 0 and 1. let's say we select B for 2 and D for 3. if we send the request without further specification, then D will respond with data for both part 2 and 3 because it has data for it, even though we selected B to do the work for partition 2.

Copy link
Contributor

@Dieterbe Dieterbe Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another problem with my approach is assigning primaries becomes hard.
with my proposal above, we could make A and D primaries, so all partitions are getting saved, and only one time.
but if A crashes and we want to establish a new primary for part 0 and 1, we would have to select B and C, but those would also save data that D is already saving. we could de-promote D and promote B and C. which is a little weird but should work though.

note if then another node fails and we only have 2/4 nodes we can't establish a proper primary setup where we save all shards once. with your approach if we have 2/4 nodes, i think we would be either more lucky (e.g. B and D crash in your example) or more unlucky (A and B crash)

I have to do the exercises, but I think with more shards, my approach would probably lend itself worse to maintaining proper primary assignment, unless we implement primary status on a per-partition level instead of per-node. so maybe for now we should stick to your way. and do the advanced stuff later as needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we just need to have the documentation explain that partitions should be split into "groups" and each node handles 1 group. Though your layout provides better request distribution when a node fails, it adds significant complexity.

I think we should try and move towards supporting your layout, but that can be done as a future improvement.

def := schema.MetricDefinitionFromMetricData(data)
def.Partition = partition
c.MemoryIdx.AddDef(def)
c.writeQueue <- writeReq{recvTime: time.Now(), def: def}
Copy link
Contributor

@Dieterbe Dieterbe Nov 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the goal of this piece?
a metric was reassigned to a different partition, probably because the number of partitions changed.? is there another reason? can you comment the intention here.

BTW this will properly overwrite the old entry with the previous partition, in cassandra right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this will handle changes in the partition of a metric. But primarily this is just to persist the change to the "lastUpdate" time.

Previously, we just sent "existing" to the write queue. But as a result, if the data in cassandra becomes corrupt for any reason it can never be fixed due to metrictank loading the data from cassandra and only every writing that same data back to cassandra with the updated lastUpdate field.
sending a fresh metricDef built from the metricData is a much more resilient approach and has a negligible performance penalty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. and why do now skip updates if the partition doesn't change? this means that if a metric never changes partition, we will never update it in cassandra. that seems wrong (same for EsIdx)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that should be if "existing.Partition == partition" so that if the partition hasnt changed we update only every ~updateIterval. But if the partition has changed we treat it like a new metric which will update it immediately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fix still has to be applied to ES index

} else {
missing := diffPartitions(partitions, availParts)
if len(missing) > 0 {
log.Fatal(4, "kafka-mdm: configured partitions not in list of available partitions.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we be more specific and just print which are missing?

@woodsaj woodsaj force-pushed the clustering branch 3 times, most recently from 3dc473a to 47d5022 Compare November 20, 2016 00:34
@@ -103,7 +103,8 @@ func (u *Usage) Report() {

m := metrics.GetOrCreate(met.Id)
m.Add(uint32(met.Time), met.Value)
metricIndex.Add(met)
//TODO: how to set the partition of the metric? We probably just need to publish the metric to our Input Plugin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the bigger problem here is that the usage stats from different metrictank instances will conflict with each other. maybe the better approach now is to have a separate service for usage tracking, a service that consumes from all partitions.
we could also have usage metrics per partition but that's still weird due to multiple instances consuming the same partitions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i dont have a good answer for what we should do here yet.

@@ -176,9 +184,19 @@ func main() {

conf.ParseAll()

// if the user just wants the version, give it and exit
Copy link
Contributor

@Dieterbe Dieterbe Nov 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would even move this before config parsing.
if the only thing you want is the version, it's annoying that it aborts on invalid configs.
and no sane person would (or should) set show-version = true in the config file. (e.g. it's reasonable to ignore a show-version = true in the config)

mdata.InitPersistNotifier(stats, handlers...)

/***********************************
Initialize our MemoryIdx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should just be "initialize our index". this could be too easily confused for the memory idx implementation

if err != nil {
log.Fatal(4, "kafka-mdm failed to create client. %s", err)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should put a defer client.Close() here, since we only use this client in this function. in New() we create a new one, which btw should probably also be closed in KafkaMdm.Close()

@@ -75,18 +117,150 @@ func (n *Node) SetState(s NodeState) {
n.Unlock()
}

// provide thread safe json Marshaling
func (n *Node) MarshalJSON() ([]byte, error) {
func (n *Node) SetPartitions(part []int32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should never be called once the initial configuration has been established by input plugins, right?
might want to put that in a comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be called at any time, but it would only make sense for the input plugin to call it as that is what controls the partitions that are being ingested. For example if the kafkamdm input plugin is configured to consume from "*" and we update the kafka topic to use more partitions it should call SetPartitions()

Copy link
Contributor

@Dieterbe Dieterbe Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wow I hadn't thought about on-the-fly adjustments like that. sounds good.
just so we're clear though:

  • carbon input partition is set by the operator in the config, and is just something that represents how they manually shard their data, based on their relaying setup and how they route carbon traffic. until we have config reloading or an api for this, it can only be changed by restarting MT
  • the mdm input does not seem to have a provision either to dynamically become aware of more partitions and call SetPartitions accordingly. so we also only call it at startup. is this something that still has to be implemented? (perhaps as a future improvement)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently SetPartitions() is only every called once. But the point is that it is only called once due to limitations in the input plugins. There is no reason to enforce the partitions to only be set once.

def := schema.MetricDefinitionFromMetricData(data)
def.Partition = partition
c.MemoryIdx.AddDef(def)
c.writeQueue <- writeReq{recvTime: time.Now(), def: def}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. and why do now skip updates if the partition doesn't change? this means that if a metric never changes partition, we will never update it in cassandra. that seems wrong (same for EsIdx)

"time"
)

var ThisNode *Node
type ModeType string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just type Mode string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Mode" is the current variable that stores the ModeType

var (
	ThisNode *Node
	Mode     ModeType

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k

peers = make([]*Node, 0)
)

func Init(name, version string, primary bool, started time.Time, interval time.Duration, m ModeType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whatInterval ? maybe probeInterval

response.Write(ctx, response.NewJson(200, status, ""))
}

var NotFoundErr = errors.New("not found")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

declarations for global variables should be after the imports (imho or best practice not sure)

r.Get("/", s.getClusterStatus)
r.Combo("/getdata", bind(models.GetData{})).Get(s.getData).Post(s.getData)
r.Combo("/index/find", bind(models.IndexFind{})).Get(s.indexFind).Post(s.indexFind)
r.Combo("/index/get", bind(models.IndexGet{})).Get(s.indexGet).Post(s.indexGet)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we call /index/get anywhere. can be removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should still keep it as it might be useful for fetching metricDefs by Id

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean manually, e.g. when troubleshooting?

or as part of cluster operation? because currently the implementation is in api/cluster.go and the http api endpoint is under /cluster which is normally for only for internal cluster operations

(note that the implementation only looks in the local index)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, manually for troubleshooting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case i don't think this line should be with all the internal clustering stuff, and the implementation code should probably also be moved away from the clustering handlers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do you suggest it get moved to?

Copy link
Contributor

@Dieterbe Dieterbe Dec 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe under /debug/index/get and in api/debug.go
I don't care so much but it does seem out of place with all the cluster stuff, since it's not used by any clustering.

We could also consider stripping the /cluster/ prefix from all http paths used by the clustering.
and just putting them at /index/list and so on. (and this one could simply be /index/get) The notion of who we expect to call a certain endpoint should perhaps not dictate the http path hierarchy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i think removing the "cluster" prefix makes the most sense.

@@ -15,6 +16,7 @@ type Req struct {
MaxPoints uint32 `json:"maxPoints"`
RawInterval uint32 `json:"rawInterval"` // the interval of the raw metric before any consolidation
Consolidator consolidation.Consolidator `json:"consolidator"`
Node *cluster.Node `json:"node"`
Copy link
Contributor

@Dieterbe Dieterbe Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't really need to encode the Node into json. the only place we decode incoming Req json is /cluster/getdata which uses getTargetsLocal which doesn't care about this attribute.

I guess it could be useful maybe to see the node name when sniffing traffic for troubleshooting (though you could also look at the dest ip), but then again all Node attributes are private so it would just be {} in json.

we should probably use json:"-" here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this should be json:"-"

localReqs := make([]models.Req, 0)
remoteReqs := make(map[*cluster.Node][]models.Req)
for _, req := range reqs {
if req.Node == nil || req.Node == cluster.ThisNode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if req.Node were nil, that would be a nasty bug in our code (correct me if i'm wrong).
i would rather crash MT with a nilpointer so we can fix the code, rather then silently ignoring it, and postponing when we become aware of the bug (which would be harder to track down because it'll be less obvious)

note also that in the only place we call this function (api/graphite.go) a few lines before we call getTargets(), we call req.Node.GetName()

Copy link
Contributor

@Dieterbe Dieterbe Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: never mind, not true
also I think we can simplify this function: instead of first splitting up the requests in two categories, and then for each category, do very similar logic. we can just iterate over all requests, for each do the wg.Add, and launch the goroutine where we check the err and series results, etc. but just call getTargetsLocal vs getTargetsRemote based on whether it's a local or remote node


}

rSeries, err := s.findSeries(ctx.OrgId, patterns, int64(fromUnix))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does rSeries mean? why not series ?

@@ -208,10 +315,25 @@ func (s *Server) metricsFind(ctx *middleware.Context, request models.GraphiteFin
if request.From != 0 {
request.From -= 86400
}
nodes, err := s.MetricIndex.Find(ctx.OrgId, request.Query, request.From)
nodes := make([]idx.Node, 0)
rSeries, err := s.findSeries(ctx.OrgId, []string{request.Query}, request.From)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does rSeries mean? why not series ?

msg := fmt.Sprintf("%d errors: ", len(errors))
for _, e := range errors {
msg += e.Error() + ", "
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of this loop you can just print errors as "%q" that will show it as a list. and in case an error message contains a , it will be less confusing. see https://play.golang.org/p/bjBWpMQ_R5

parsedTargets := make(map[string]string)

// consolidatorForPattern[<pattern>]<consolidateBy>
consolidatorForPattern := make(map[string]string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that in graphite it's legal to do a query such as:

render?target=consolidateBy(foo.qps, "min")
&target=consolidateBy(foo.qps, "max")
&target=consolidateBy(foo.qps, "avg")

I think this code wouldn't work with that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is true, but this code works the same as what was here before (just some of the variable names have been changed)

I guess we need to make this map[string][]string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we should fix this in this PR. It is going to require a lot of refactoring of how we parse targets. I think it is best if we address this after this PR has been merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's fine by me

@@ -177,7 +284,7 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a new implication here: aggsettings must be consistent across all peers. seems obvious, but we should document it in the config files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added to docs.

}
seenPaths := make(map[string]struct{})
// different nodes may have overlapping data in their index.
// maybe because they loaded the entire index from a persistent store,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is no longer applicable

msg := fmt.Sprintf("%d errors: ", len(errors))
for _, e := range errors {
msg += e.Error() + ", "
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here can just use '%q'

}

// different nodes may have overlapping data in their index.
// maybe because they loaded the entire index from a persistent store,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line no longer applies.

also this comment corresponds to code that is a bit higher up


rSeries, err := s.findSeries(ctx.OrgId, patterns, int64(fromUnix))
if err != nil {
response.Write(ctx, response.NewError(http.StatusInternalServerError, err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that err here can contain low-level errors revealing details about our infrastructure. (could be like dial timeout to 10.240.x.x , network unreachable, etc)
I think those should be abstracted away to the user, and just tell the user we had an internal problem with the cluster, or something

if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err))
return
response.Write(ctx, response.NewError(http.StatusInternalServerError, err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same note here about not revealing low-level details


if err != nil {
log.Error(3, "HTTP IndexJson() %s", err.Error())
response.Write(ctx, response.NewError(http.StatusInternalServerError, err))
Copy link
Contributor

@Dieterbe Dieterbe Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same note here about not revealing low level details to user

@Dieterbe
Copy link
Contributor

Dieterbe commented Nov 22, 2016

let's say we have a hypothetical org that does not use the new clustering (e.g. single node, or multiple nodes that all get all data), and then we decide to split up data into shards, do you foresee any issues?
I think:

  • the MT's will update the partition info in their memory+cass/es idx, that part will work
  • some data will not load during the transition, because the nodes that now own the metricdefs will only have very recent points in RAM and what is in cassandra is only the last saved chunk (this is why our current MT has a warmup feature, but not sure how a warmup would like with the clustering). one way to alleviate is postpone partition reassignments until right after a chunk save interval
  • there would be some data loss: let's say chunks save every hour (raw + aggregated), and at 10:02 we change our ingestion so that we change partition reassignments for metricdefs. i'm not quite clear yet on how primary assignment will work, but either the data from 10:00-10:02 will be lost, or the data from 10:02-11:00, it depends who gets to write the last chunk and if there's any overwriting.

@Dieterbe
Copy link
Contributor

Dieterbe commented Nov 22, 2016

to extend a bit on my earlier question: the approach used in this PR uses:

  • a partition field in the metricdef. so memory,cass and ES will all store the partition for every single metricdef.
  • a partition field in cassandra and ES, to track partition for each def
  • cassandra has an index on the partition field, and ES an analyzer on it.

this seems pretty redundant & high-overhead:

  • cassandra and ES store the partition twice (in the metricdef, and as a separate field)
  • the index/analyzer has a cost.

since partition is low cardinality (with >10k or more metricdefs for every partition), it seems to me we can simply create separate tables in cassandra, or separate "indices" in elasticsearch for every partition. this way we don't have to include the partition in every metricdef (when stored), we don't need an extra field for every def, and we don't need the cost of the index/analyzer.

in memory, we may want to do something similar: basically just shard the in-memory structures by partition, instead of storing the partition for every single def.

(note: there is a catch here, a partition reassignment will also involve a delete from the old shard/table/index)

@Dieterbe
Copy link
Contributor

todo: (can be done after everything else stabilized) update {sample,docker,package} config, docs/config.md, README.md (limitations), docs/clustering.md, docs/http-api.md, docs/roadmap.md

@woodsaj
Copy link
Member Author

woodsaj commented Nov 22, 2016

We need to store the partition in ES and Cassandra in an indexed field so we can query using it.
For ES, there is no further optimization we can do, but we can optimize a little better for cassandra. I do not like the idea of separate indexes in ES or tables in Cassandra for each partition. This will have a significant performance penalty. In Cassandra for example, writes are buffered in the memtables before being periodically flushed to SStables. If you have 64 partitions in 1 table you will likely see 1 write every flush interval. If you have 64 tables you will need 64 writes.

We need to know the partition everytime we try and re-index a metric, as we need to know if the partition has changed.
The way the ESIndex works (specifically how it handles retries) we need the partition in the metricDef. It would be a significant amount of refactoring to change this behavior.

keeping the partition in the metricDef makes handling things much easier. Given that the partition is an int32, storing it only costs os 4MB of ram for every 1million metricDefs. So it is my opinion that with the additional complexity needed to avoid storing the partition you are going to be worse off. And if you are not worse off, i still don't think it is worth the effort.

For the cassandra index i have already considered moving from the whole metricDef in a msgp encoded Blob to columns for each field of the metricDef. This might be another reason to do that.

@woodsaj
Copy link
Member Author

woodsaj commented Nov 22, 2016

let's say we have a hypothetical org that does not use the new clustering (e.g. single node, or multiple nodes that all get all data), and then we decide to split up data into shards, do you foresee any issues?
I think the MT's will update the partition info in their memory+cass/es idx, and everything should just work.

If you are using kafkamdm and already using multiple partitions, then everything will just work nicely as the metricDefs will already include the correct partition.

If you change the number of partitions or change the hashing used, then the cluster might have a few issues as we dont have great logic for dealing with multiple nodes believing they have a metric. But this would pass once the node no longer seeing the metric marks it as stale and deletes it from its memoryIdx.


func ConfigSetup() {
clusterCfg := flag.NewFlagSet("cluster", flag.ExitOnError)
clusterCfg.BoolVar(&primary, "primary-node", false, "the primary node writes data to cassandra. There should only be 1 primary node per cluster of nodes.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only one "per shardgroup"

@Dieterbe
Copy link
Contributor

Dieterbe commented Nov 23, 2016

I can confirm again I got the following error:

2016/11/23 09:27:34 [metrictank.go:334 main()] [E] failed to initialize cassandra. java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.cassandra.exceptions.ConfigurationException: Column family ID mismatch (found 103be8f0-b15f-11e6-92df-7964cb8cd63c; expected 103902c0-b15f-11e6-92df-7964cb8cd63c)

and I can also confirm that making other MT instances wait for 1 of them (who does the initialisation) to start their api server is a good solution, better than 283c85f and 28c3f93 from #320

so this should be documented in our cluster documentation

@Dieterbe
Copy link
Contributor

Dieterbe commented Dec 9, 2016

docs/cassandra.md also needs updating

@woodsaj woodsaj force-pushed the clustering branch 2 times, most recently from 42f4597 to b64f47f Compare December 12, 2016 03:12
func (s *Server) indexDelete(ctx *middleware.Context, req models.IndexDelete) {
defs, err := s.MetricIndex.Delete(req.OrgId, req.Query)
if err != nil {
// errors can only be caused by bad request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another case of callee implementation leaking into caller, where I think more specific, detailed errors should flow from the code that actually generates them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree with this sentiment. But setting the response status code based on what we know (think) the errors returned can be is used a lot throughout the code base, eg like this example from pre api refactoring,. https://github.com/raintank/metrictank/blob/0.5.8/http.go#L294

so fixing it is outside the scope of this PR.

woodsaj and others added 7 commits December 12, 2016 20:35
- keep track of nodes in the cluster
- assign "partitions" to each node
- include parition info in the metric index for each metric
- update MetricIndex plugins to be partition aware
  - this means the data stored in the backend stores will change
    we will need to provide some migration tools.
- make carbon and kafkamdm input plugins parition aware
- when calling graphite's /render api fan out the request to
  one node for each partition
- when calling graphite's /metrics/find api fan out the requests
  to one node for each partition
- when calling graphites /metrics/index.jon api fan out the requests
  to one node for each partition
- add internal "cluster" api's to support the clustered graphite api
  requests
default primary to true so out of the box experience is nicer
that's how it is in docker and package configs also btw.
instead of encoding the metricDef to msgpack then storing as a
blob in Cassandra, lets just store each field as a column in cassandra.
This allows data to be encoded in cassandra's native format and also
makes troubleshooting easier as the metricDefs can be viewed/searched
directly in Cassandra.

This new branch shows a decrease in performance for writes primarily due to
cassandra overheads handling the secondary index on the partition id.
Reads also show a slight drop in throughput, which is again due to cassandra.
Metrictank actually sees a drop in allocations and overall bytes per op.

go test -run=None -v -bench=. -benchtime=10s
MASTER:
BenchmarkIndexing-4	  200000	     84400 ns/op	    2738 B/op	      56 allocs/op
BenchmarkLoad-4    	 1000000	     15864 ns/op	    2098 B/op	      22 allocs/op
------------------------
THIS BRANCH
BenchmarkIndexing-4	  200000	    106320 ns/op	    2973 B/op	      58 allocs/op
BenchmarkLoad-4    	  500000	     37836 ns/op	    1784 B/op	      15 allocs/op
@Dieterbe
Copy link
Contributor

Dieterbe commented Dec 12, 2016

Seems like there's 6 things left to do :

Anthony Woods added 3 commits December 12, 2016 23:12
if errors implenet the response.Error interface then we use the e.Code()
in the API response, else we assume an internal server error and set the
code to 500.
Anthony Woods added 2 commits December 13, 2016 00:41
instead of encoding the metricDef to msgpack then storing as a
blob in Cassandra, lets just store each field as a column in cassandra.
This allows data to be encoded in cassandra's native format and also
makes troubleshooting easier as the metricDefs can be viewed/searched
directly in Cassandra.

This new branch shows a decrease in performance for writes primarily due to
cassandra overheads handling the secondary index on the partition id.
Reads also show a slight drop in throughput, which is again due to cassandra.
Metrictank actually sees a drop in allocations and overall bytes per op.

go test -run=None -v -bench=. -benchtime=10s
MASTER:
BenchmarkIndexing-4	  200000	     84400 ns/op	    2738 B/op	      56 allocs/op
BenchmarkLoad-4    	 1000000	     15864 ns/op	    2098 B/op	      22 allocs/op
------------------------
THIS BRANCH
BenchmarkIndexing-4	  200000	    106320 ns/op	    2973 B/op	      58 allocs/op
BenchmarkLoad-4    	  500000	     37836 ns/op	    1784 B/op	      15 allocs/op
@woodsaj
Copy link
Member Author

woodsaj commented Dec 12, 2016

@Dieterbe the only outstading item is to move the /cluster/index/get endpoint somewhere

@woodsaj
Copy link
Member Author

woodsaj commented Dec 13, 2016

@Dieterbe LGTM

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants