Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into chunkSaveRefactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Woods authored Jan 17, 2017
2 parents c596193 + 5c77d2c commit c3519f8
Show file tree
Hide file tree
Showing 153 changed files with 32,723 additions and 575 deletions.
4 changes: 2 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func NewServer() (*Server, error) {
})

return &Server{
Addr: addr,
SSL: useSSL,
Addr: Addr,
SSL: UseSSL,
certFile: certFile,
keyFile: keyFile,
shutdown: make(chan struct{}),
Expand Down
20 changes: 15 additions & 5 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package api

import (
"errors"
"fmt"
"net/http"
"strconv"

"github.com/raintank/metrictank/api/middleware"
"github.com/raintank/metrictank/api/models"
Expand All @@ -15,16 +17,24 @@ import (
var NotFoundErr = errors.New("not found")

func (s *Server) getNodeStatus(ctx *middleware.Context) {
response.Write(ctx, response.NewJson(200, cluster.ThisNode, ""))
response.Write(ctx, response.NewJson(200, cluster.Manager.ThisNode(), ""))
}

func (s *Server) setNodeStatus(ctx *middleware.Context, status models.NodeStatus) {
cluster.ThisNode.SetPrimary(status.Primary)
primary, err := strconv.ParseBool(status.Primary)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, fmt.Sprintf(
"could not parse status to bool. %s",
err.Error())),
)
return
}
cluster.Manager.SetPrimary(primary)
ctx.PlainText(200, []byte("OK"))
}

func (s *Server) appStatus(ctx *middleware.Context) {
if cluster.ThisNode.IsReady() {
if cluster.Manager.IsReady() {
ctx.PlainText(200, []byte("OK"))
return
}
Expand All @@ -34,8 +44,8 @@ func (s *Server) appStatus(ctx *middleware.Context) {

func (s *Server) getClusterStatus(ctx *middleware.Context) {
status := models.ClusterStatus{
Node: cluster.ThisNode,
Peers: cluster.GetPeers(),
NodeName: cluster.Manager.ThisNode().Name,
Members: cluster.Manager.MemberList(),
}
response.Write(ctx, response.NewJson(200, status, ""))
}
Expand Down
10 changes: 5 additions & 5 deletions api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ var (
logMinDurStr string
logMinDur uint32

addr string
useSSL bool
Addr string
UseSSL bool
certFile string
keyFile string
)
Expand All @@ -27,8 +27,8 @@ func ConfigSetup() {
apiCfg.IntVar(&maxPointsPerReq, "max-points-per-req", 1000000, "max points could be requested in one request. 1M allows 500 series at a MaxDataPoints of 2000. (0 disables limit)")
apiCfg.StringVar(&logMinDurStr, "log-min-dur", "5min", "only log incoming requests if their timerange is at least this duration. Use 0 to disable")

apiCfg.StringVar(&addr, "listen", ":6060", "http listener address.")
apiCfg.BoolVar(&useSSL, "ssl", false, "use HTTPS")
apiCfg.StringVar(&Addr, "listen", ":6060", "http listener address.")
apiCfg.BoolVar(&UseSSL, "ssl", false, "use HTTPS")
apiCfg.StringVar(&certFile, "cert-file", "", "SSL certificate file")
apiCfg.StringVar(&keyFile, "key-file", "", "SSL key file")
globalconf.Register("http", apiCfg)
Expand All @@ -38,7 +38,7 @@ func ConfigProcess() {
logMinDur = dur.MustParseUsec("log-min-dur", logMinDurStr)

//validate the addr
_, err := net.ResolveTCPAddr("tcp", addr)
_, err := net.ResolveTCPAddr("tcp", Addr)
if err != nil {
log.Fatal(4, "API listen address is not a valid TCP address.")
}
Expand Down
22 changes: 11 additions & 11 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/raintank/metrictank/api/models"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/consolidation"
"github.com/raintank/metrictank/mdata/chunk"
"github.com/raintank/metrictank/util"
Expand Down Expand Up @@ -170,12 +169,12 @@ func aggEvery(numPoints, maxPoints uint32) uint32 {
func (s *Server) getTargets(reqs []models.Req) ([]models.Series, error) {
// split reqs into local and remote.
localReqs := make([]models.Req, 0)
remoteReqs := make(map[*cluster.Node][]models.Req)
remoteReqs := make(map[string][]models.Req)
for _, req := range reqs {
if req.Node == cluster.ThisNode {
if req.Node.IsLocal() {
localReqs = append(localReqs, req)
} else {
remoteReqs[req.Node] = append(remoteReqs[req.Node], req)
remoteReqs[req.Node.Name] = append(remoteReqs[req.Node.Name], req)
}
}

Expand Down Expand Up @@ -227,15 +226,16 @@ func (s *Server) getTargets(reqs []models.Req) ([]models.Series, error) {
return out, err
}

func (s *Server) getTargetsRemote(remoteReqs map[*cluster.Node][]models.Req) ([]models.Series, error) {
func (s *Server) getTargetsRemote(remoteReqs map[string][]models.Req) ([]models.Series, error) {
seriesChan := make(chan []models.Series, len(remoteReqs))
errorsChan := make(chan error, len(remoteReqs))
wg := sync.WaitGroup{}
wg.Add(len(remoteReqs))
for node, nodeReqs := range remoteReqs {
log.Debug("DP getTargetsRemote: handling %d reqs from %s", len(nodeReqs), node.GetName())
go func(reqs []models.Req, node *cluster.Node) {
for _, nodeReqs := range remoteReqs {
log.Debug("DP getTargetsRemote: handling %d reqs from %s", len(nodeReqs), nodeReqs[0].Node.Name)
go func(reqs []models.Req) {
defer wg.Done()
node := reqs[0].Node
buf, err := node.Post("/getdata", models.GetData{Requests: reqs})
if err != nil {
errorsChan <- err
Expand All @@ -244,13 +244,13 @@ func (s *Server) getTargetsRemote(remoteReqs map[*cluster.Node][]models.Req) ([]
var resp models.GetDataResp
buf, err = resp.UnmarshalMsg(buf)
if err != nil {
log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.GetName(), err)
log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.Name, err)
errorsChan <- err
return
}
log.Debug("DP getTargetsRemote: %s returned %d series", node.GetName(), len(resp.Series))
log.Debug("DP getTargetsRemote: %s returned %d series", node.Name, len(resp.Series))
seriesChan <- resp.Series
}(nodeReqs, node)
}(nodeReqs)
}
go func() {
wg.Wait()
Expand Down
126 changes: 111 additions & 15 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type testCase struct {
}

func init() {
cluster.Init("default", "test", time.Now())
cluster.Init("default", "test", time.Now(), "http", 6060)
}

func validate(cases []testCase, t *testing.T) {
Expand Down Expand Up @@ -543,10 +543,10 @@ func TestPrevBoundary(t *testing.T) {

// TestGetSeriesFixed assures that series data is returned in proper form.
func TestGetSeriesFixed(t *testing.T) {
cluster.Init("default", "test", time.Now())
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewDevnullStore()
metrics := mdata.NewAggMetrics(store, 600, 10, 0, 0, 0, 0, []mdata.AggSetting{})
addr = "localhost:6060"
Addr = "localhost:6060"
srv, _ := NewServer()
srv.BindBackendStore(store)
srv.BindMemoryStore(metrics)
Expand All @@ -573,7 +573,7 @@ func TestGetSeriesFixed(t *testing.T) {
metric.Add(20+offset, 30) // this point will always be quantized to 30, so it should be selected
metric.Add(30+offset, 40) // this point will always be quantized to 40
metric.Add(40+offset, 50) // this point will always be quantized to 50
req := models.NewReq(name, name, from, to, 1000, 10, consolidation.Avg, cluster.ThisNode)
req := models.NewReq(name, name, from, to, 1000, 10, consolidation.Avg, cluster.Manager.ThisNode())
req.ArchInterval = 10
points := srv.getSeriesFixed(req, consolidation.None)
if !reflect.DeepEqual(expected, points) {
Expand All @@ -592,11 +592,11 @@ type alignCase struct {
}

func reqRaw(key string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator) models.Req {
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.ThisNode)
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.Manager.ThisNode())
return req
}
func reqOut(key string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, archive int, archInterval, outInterval, aggNum uint32) models.Req {
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.ThisNode)
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.Manager.ThisNode())
req.Archive = archive
req.ArchInterval = archInterval
req.OutInterval = outInterval
Expand Down Expand Up @@ -1041,14 +1041,57 @@ func TestAlignRequests(t *testing.T) {
t.Errorf("different amount of requests for testcase %d expected: %v, got: %v", i, len(ac.outReqs), len(out))
} else {
for r, exp := range ac.outReqs {
if exp != out[r] {
if !compareReqEqual(exp, out[r]) {
t.Errorf("testcase %d, request %d:\nexpected: %v\n got: %v", i, r, exp.DebugString(), out[r].DebugString())
}
}
}
}
}

// return true if a and b are equal. Equal means that all of the struct
// fields are equal. For the req.Node, we just compare the node.Name rather
// then doing a deep comparision.
func compareReqEqual(a, b models.Req) bool {
if a.Key != b.Key {
return false
}
if a.Target != b.Target {
return false
}
if a.From != b.From {
return false
}
if a.To != b.To {
return false
}
if a.MaxPoints != b.MaxPoints {
return false
}
if a.RawInterval != b.RawInterval {
return false
}
if a.Consolidator != b.Consolidator {
return false
}
if a.Node.Name != b.Node.Name {
return false
}
if a.Archive != b.Archive {
return false
}
if a.ArchInterval != b.ArchInterval {
return false
}
if a.OutInterval != b.OutInterval {
return false
}
if a.AggNum != b.AggNum {
return false
}
return true
}

func TestMergeSeries(t *testing.T) {
out := make([]models.Series, 0)
for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -1288,13 +1331,66 @@ func TestGetSeriesCachedStore(t *testing.T) {
t.Fatalf("Pattern %s From %d To %d; Expected results to have len 0 but got %d", pattern, from, to, len(tsSlice))
}

// if current query is the maximum range we check the cache stats to
// verify it got all the hits it should have
if from == start && to == lastTs {
hits := accnt.CacheChunkHit.Peek()
if tc.Hits != hits {
t.Fatalf("Pattern %s From %d To %d; Expected %d hits but got %d", pattern, from, to, tc.Hits, hits)
expectedHits := uint32(0)
complete := false
// because ranges are exclusive at the end we'll test for to - 1
exclTo := to - 1

// if from is equal to we always expect 0 hits
if from != to {

// seek hits from beginning of the searched ranged within the given pattern
for i := 0; i < len(pattern); i++ {

// if pattern index is lower than from's chunk we continue
if from-(from%span) > start+uint32(i)*span {
continue
}

// current pattern index is a cache hit, so we expect one more
if pattern[i] == 'c' || pattern[i] == 'b' {
expectedHits++
} else {
break
}

// if we've already seeked beyond to's pattern we break and mark the seek as complete
if exclTo-(exclTo%span) == start+uint32(i)*span {
complete = true
break
}
}

// only if the previous seek was not complete we launch one from the other end
if !complete {

// now the same from the other end (just like the cache searching does)
for i := len(pattern) - 1; i >= 0; i-- {

// if pattern index is above to's chunk we continue
if exclTo-(exclTo%span)+span <= start+uint32(i)*span {
continue
}

// current pattern index is a cache hit, so we expecte one more
if pattern[i] == 'c' || pattern[i] == 'b' {
expectedHits++
} else {
break
}

// if we've already seeked beyond from's pattern we break
if from-(from%span) == start+uint32(i)*span {
break
}
}
}
}

// verify we got all cache hits we should have
hits := accnt.CacheChunkHit.Peek()
if expectedHits != hits {
t.Fatalf("Pattern %s From %d To %d; Expected %d hits but got %d", pattern, from, to, expectedHits, hits)
}
accnt.CacheChunkHit.SetUint32(0)

Expand All @@ -1307,12 +1403,12 @@ func TestGetSeriesCachedStore(t *testing.T) {
}

func TestGetSeriesAggMetrics(t *testing.T) {
cluster.Init("default", "test", time.Now())
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewMockStore()
chunkSpan := uint32(600)
numChunks := uint32(10)
metrics := mdata.NewAggMetrics(store, chunkSpan, numChunks, 0, 0, 0, 0, []mdata.AggSetting{})
addr = "localhost:6060"
Addr = "localhost:6060"
srv, _ := NewServer()
srv.BindBackendStore(store)
srv.BindMemoryStore(metrics)
Expand Down
Loading

0 comments on commit c3519f8

Please sign in to comment.