diff --git a/CHANGELOG.md b/CHANGELOG.md index 71f2864ba226..355679dc9240 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.7 (Unreleased) + +IMPROVEMENTS: + * telemetry: Add support for tagged metrics for Nomad clients [GH-3147] + ## 0.6.3 (Unreleased) BUG FIXES: diff --git a/client/client.go b/client/client.go index c5ed19097cff..d657d9224b8c 100644 --- a/client/client.go +++ b/client/client.go @@ -13,7 +13,7 @@ import ( "sync" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/boltdb/bolt" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" @@ -153,6 +153,10 @@ type Client struct { // clientACLResolver holds the ACL resolution state clientACLResolver + + // baseLabels are used when emitting tagged metrics. All client metrics will + // have these tags, and optionally more. + baseLabels []metrics.Label } var ( @@ -1843,6 +1847,10 @@ DISCOLOOP: // emitStats collects host resource usage stats periodically func (c *Client) emitStats() { + // Assign labels directly before emitting stats so the information expected + // is ready + c.baseLabels = []metrics.Label{{Name: "node_id", Value: c.Node().ID}, {Name: "datacenter", Value: c.Node().Datacenter}} + // Start collecting host stats right away and then keep collecting every // collection interval next := time.NewTimer(0) @@ -1859,7 +1867,7 @@ func (c *Client) emitStats() { // Publish Node metrics if operator has opted in if c.config.PublishNodeMetrics { - c.emitHostStats(c.hostStatsCollector.Stats()) + c.emitHostStats() } c.emitClientMetrics() @@ -1869,33 +1877,69 @@ func (c *Client) emitStats() { } } -// emitHostStats pushes host resource usage stats to remote metrics collection sinks -func (c *Client) emitHostStats(hStats *stats.HostStats) { - nodeID := c.Node().ID - metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total)) - metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available)) - metrics.SetGauge([]string{"client", "host", "memory", nodeID, "used"}, float32(hStats.Memory.Used)) - metrics.SetGauge([]string{"client", "host", "memory", nodeID, "free"}, float32(hStats.Memory.Free)) +// setGaugeForMemoryStats proxies metrics for memory specific statistics +func (c *Client) setGaugeForMemoryStats(nodeID string, hStats *stats.HostStats) { + if !c.config.DisableTaggedMetrics { + metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "total"}, float32(hStats.Memory.Total), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "available"}, float32(hStats.Memory.Available), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "used"}, float32(hStats.Memory.Used), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "free"}, float32(hStats.Memory.Free), c.baseLabels) + } - metrics.SetGauge([]string{"uptime"}, float32(hStats.Uptime)) + if c.config.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total)) + metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available)) + metrics.SetGauge([]string{"client", "host", "memory", nodeID, "used"}, float32(hStats.Memory.Used)) + metrics.SetGauge([]string{"client", "host", "memory", nodeID, "free"}, float32(hStats.Memory.Free)) + } +} +// setGaugeForCPUStats proxies metrics for CPU specific statistics +func (c *Client) setGaugeForCPUStats(nodeID string, hStats *stats.HostStats) { for _, cpu := range hStats.CPU { - metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "total"}, float32(cpu.Total)) - metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "user"}, float32(cpu.User)) - metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "idle"}, float32(cpu.Idle)) - metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "system"}, float32(cpu.System)) + if !c.config.DisableTaggedMetrics { + labels := append(c.baseLabels, metrics.Label{"cpu", cpu.CPU}) + + metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "total"}, float32(cpu.Total), labels) + metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "user"}, float32(cpu.User), labels) + metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "idle"}, float32(cpu.Idle), labels) + metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "system"}, float32(cpu.System), labels) + } + + if c.config.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "total"}, float32(cpu.Total)) + metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "user"}, float32(cpu.User)) + metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "idle"}, float32(cpu.Idle)) + metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "system"}, float32(cpu.System)) + } } +} +// setGaugeForDiskStats proxies metrics for disk specific statistics +func (c *Client) setGaugeForDiskStats(nodeID string, hStats *stats.HostStats) { for _, disk := range hStats.DiskStats { - metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "size"}, float32(disk.Size)) - metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used"}, float32(disk.Used)) - metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "available"}, float32(disk.Available)) - metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used_percent"}, float32(disk.UsedPercent)) - metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "inodes_percent"}, float32(disk.InodesUsedPercent)) + if !c.config.DisableTaggedMetrics { + labels := append(c.baseLabels, metrics.Label{"disk", disk.Device}) + + metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "size"}, float32(disk.Size), labels) + metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "used"}, float32(disk.Used), labels) + metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "available"}, float32(disk.Available), labels) + metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "used_percent"}, float32(disk.UsedPercent), labels) + metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "inodes_percent"}, float32(disk.InodesUsedPercent), labels) + } + + if c.config.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "size"}, float32(disk.Size)) + metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used"}, float32(disk.Used)) + metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "available"}, float32(disk.Available)) + metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used_percent"}, float32(disk.UsedPercent)) + metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "inodes_percent"}, float32(disk.InodesUsedPercent)) + } } +} - // Get all the resources for the node - c.configLock.RLock() +// setGaugeForAllocationStats proxies metrics for allocation specific statistics +func (c *Client) setGaugeForAllocationStats(nodeID string) { node := c.configCopy.Node c.configLock.RUnlock() total := node.Resources @@ -1903,13 +1947,29 @@ func (c *Client) emitHostStats(hStats *stats.HostStats) { allocated := c.getAllocatedResources(node) // Emit allocated - metrics.SetGauge([]string{"client", "allocated", "memory", nodeID}, float32(allocated.MemoryMB)) - metrics.SetGauge([]string{"client", "allocated", "disk", nodeID}, float32(allocated.DiskMB)) - metrics.SetGauge([]string{"client", "allocated", "cpu", nodeID}, float32(allocated.CPU)) - metrics.SetGauge([]string{"client", "allocated", "iops", nodeID}, float32(allocated.IOPS)) + if !c.config.DisableTaggedMetrics { + metrics.SetGaugeWithLabels([]string{"client", "allocated", "memory"}, float32(allocated.MemoryMB), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocated", "disk"}, float32(allocated.DiskMB), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocated", "cpu"}, float32(allocated.CPU), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocated", "iops"}, float32(allocated.IOPS), c.baseLabels) + } + + if c.config.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"client", "allocated", "memory", nodeID}, float32(allocated.MemoryMB)) + metrics.SetGauge([]string{"client", "allocated", "disk", nodeID}, float32(allocated.DiskMB)) + metrics.SetGauge([]string{"client", "allocated", "cpu", nodeID}, float32(allocated.CPU)) + metrics.SetGauge([]string{"client", "allocated", "iops", nodeID}, float32(allocated.IOPS)) + } for _, n := range allocated.Networks { - metrics.SetGauge([]string{"client", "allocated", "network", n.Device, nodeID}, float32(n.MBits)) + if !c.config.DisableTaggedMetrics { + labels := append(c.baseLabels, metrics.Label{"device", n.Device}) + metrics.SetGaugeWithLabels([]string{"client", "allocated", "network"}, float32(n.MBits), labels) + } + + if c.config.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"client", "allocated", "network", n.Device, nodeID}, float32(n.MBits)) + } } // Emit unallocated @@ -1917,10 +1977,20 @@ func (c *Client) emitHostStats(hStats *stats.HostStats) { unallocatedDisk := total.DiskMB - res.DiskMB - allocated.DiskMB unallocatedCpu := total.CPU - res.CPU - allocated.CPU unallocatedIops := total.IOPS - res.IOPS - allocated.IOPS - metrics.SetGauge([]string{"client", "unallocated", "memory", nodeID}, float32(unallocatedMem)) - metrics.SetGauge([]string{"client", "unallocated", "disk", nodeID}, float32(unallocatedDisk)) - metrics.SetGauge([]string{"client", "unallocated", "cpu", nodeID}, float32(unallocatedCpu)) - metrics.SetGauge([]string{"client", "unallocated", "iops", nodeID}, float32(unallocatedIops)) + + if !c.config.DisableTaggedMetrics { + metrics.SetGaugeWithLabels([]string{"client", "unallocated", "memory"}, float32(unallocatedMem), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "unallocated", "disk"}, float32(unallocatedDisk), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "unallocated", "cpu"}, float32(unallocatedCpu), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "unallocated", "iops"}, float32(unallocatedIops), c.baseLabels) + } + + if c.config.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"client", "unallocated", "memory", nodeID}, float32(unallocatedMem)) + metrics.SetGauge([]string{"client", "unallocated", "disk", nodeID}, float32(unallocatedDisk)) + metrics.SetGauge([]string{"client", "unallocated", "cpu", nodeID}, float32(unallocatedCpu)) + metrics.SetGauge([]string{"client", "unallocated", "iops", nodeID}, float32(unallocatedIops)) + } for _, n := range allocated.Networks { totalMbits := 0 @@ -1932,10 +2002,42 @@ func (c *Client) emitHostStats(hStats *stats.HostStats) { } unallocatedMbits := totalMbits - n.MBits - metrics.SetGauge([]string{"client", "unallocated", "network", n.Device, nodeID}, float32(unallocatedMbits)) + + if !c.config.DisableTaggedMetrics { + labels := append(c.baseLabels, metrics.Label{"device", n.Device}) + metrics.SetGaugeWithLabels([]string{"client", "unallocated", "network"}, float32(unallocatedMbits), labels) + } + + if c.config.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"client", "unallocated", "network", n.Device, nodeID}, float32(unallocatedMbits)) + } + } +} + +// No lables are required so we emit with only a key/value syntax +func (c *Client) setGaugeForUptime(hStats *stats.HostStats) { + if !c.config.DisableTaggedMetrics { + metrics.SetGaugeWithLabels([]string{"uptime"}, float32(hStats.Uptime), c.baseLabels) + } + if c.config.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"uptime"}, float32(hStats.Uptime)) } } +// emitHostStats pushes host resource usage stats to remote metrics collection sinks +func (c *Client) emitHostStats() { + nodeID := c.Node().ID + hStats := c.hostStatsCollector.Stats() + + c.setGaugeForMemoryStats(nodeID, hStats) + c.setGaugeForUptime(hStats) + c.setGaugeForCPUStats(nodeID, hStats) + c.setGaugeForDiskStats(nodeID, hStats) + + // TODO: This should be moved to emitClientMetrics + c.setGaugeForAllocationStats(nodeID) +} + // emitClientMetrics emits lower volume client metrics func (c *Client) emitClientMetrics() { nodeID := c.Node().ID @@ -1960,11 +2062,21 @@ func (c *Client) emitClientMetrics() { } } - metrics.SetGauge([]string{"client", "allocations", "migrating", nodeID}, float32(migrating)) - metrics.SetGauge([]string{"client", "allocations", "blocked", nodeID}, float32(blocked)) - metrics.SetGauge([]string{"client", "allocations", "pending", nodeID}, float32(pending)) - metrics.SetGauge([]string{"client", "allocations", "running", nodeID}, float32(running)) - metrics.SetGauge([]string{"client", "allocations", "terminal", nodeID}, float32(terminal)) + if !c.config.DisableTaggedMetrics { + metrics.SetGaugeWithLabels([]string{"client", "allocations", "migrating"}, float32(migrating), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocations", "blocked"}, float32(blocked), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocations", "pending"}, float32(pending), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocations", "running"}, float32(running), c.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocations", "terminal"}, float32(terminal), c.baseLabels) + } + + if c.config.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"client", "allocations", "migrating", nodeID}, float32(migrating)) + metrics.SetGauge([]string{"client", "allocations", "blocked", nodeID}, float32(blocked)) + metrics.SetGauge([]string{"client", "allocations", "pending", nodeID}, float32(pending)) + metrics.SetGauge([]string{"client", "allocations", "running", nodeID}, float32(running)) + metrics.SetGauge([]string{"client", "allocations", "terminal", nodeID}, float32(terminal)) + } } func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resources { diff --git a/client/client_test.go b/client/client_test.go index 0e77e6cb86c0..fafd338add3e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -22,6 +22,7 @@ import ( nconfig "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/testutil" "github.com/mitchellh/hashstructure" + "github.com/stretchr/testify/assert" ctestutil "github.com/hashicorp/nomad/client/testutil" ) @@ -136,6 +137,32 @@ func TestClient_StartStop(t *testing.T) { } } +// Certain labels for metrics are dependant on client intial setup. This tests +// that the client has properly initialized before we assign values to labels +func TestClient_BaseLabels(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + client := testClient(t, nil) + if err := client.Shutdown(); err != nil { + t.Fatalf("err: %v", err) + } + + // directly invoke this function, as otherwise this will fail on a CI build + // due to a race condition + client.emitStats() + + baseLabels := client.baseLabels + assert.NotEqual(0, len(baseLabels)) + + nodeID := client.Node().ID + for _, e := range baseLabels { + if e.Name == "node_id" { + assert.Equal(nodeID, e.Value) + } + } +} + func TestClient_RPC(t *testing.T) { t.Parallel() s1, addr := testServer(t, nil) diff --git a/client/config/config.go b/client/config/config.go index 6ce263eb04fc..39d50507c6a9 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -188,6 +188,14 @@ type Config struct { // ACLPolicyTTL is how long we cache policy values for ACLPolicyTTL time.Duration + + // DisableTaggedMetrics determines whether metrics will be displayed via a + // key/value/tag format, or simply a key/value format + DisableTaggedMetrics bool + + // BackwardsCompatibleMetrics determines whether to show methods of + // displaying metrics for older verions, or to only show the new format + BackwardsCompatibleMetrics bool } func (c *Config) Copy() *Config { @@ -205,20 +213,22 @@ func (c *Config) Copy() *Config { // DefaultConfig returns the default configuration func DefaultConfig() *Config { return &Config{ - Version: version.GetVersion(), - VaultConfig: config.DefaultVaultConfig(), - ConsulConfig: config.DefaultConsulConfig(), - LogOutput: os.Stderr, - Region: "global", - StatsCollectionInterval: 1 * time.Second, - TLSConfig: &config.TLSConfig{}, - LogLevel: "DEBUG", - GCInterval: 1 * time.Minute, - GCParallelDestroys: 2, - GCDiskUsageThreshold: 80, - GCInodeUsageThreshold: 70, - GCMaxAllocs: 50, - NoHostUUID: true, + Version: version.GetVersion(), + VaultConfig: config.DefaultVaultConfig(), + ConsulConfig: config.DefaultConsulConfig(), + LogOutput: os.Stderr, + Region: "global", + StatsCollectionInterval: 1 * time.Second, + TLSConfig: &config.TLSConfig{}, + LogLevel: "DEBUG", + GCInterval: 1 * time.Minute, + GCParallelDestroys: 2, + GCDiskUsageThreshold: 80, + GCInodeUsageThreshold: 70, + GCMaxAllocs: 50, + NoHostUUID: true, + DisableTaggedMetrics: false, + BackwardsCompatibleMetrics: false, } } diff --git a/client/task_runner.go b/client/task_runner.go index c953a76c8f8c..cd5afbd91975 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -14,7 +14,7 @@ import ( "sync" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/boltdb/bolt" "github.com/golang/snappy" "github.com/hashicorp/consul-template/signals" @@ -161,6 +161,10 @@ type TaskRunner struct { // persistedHash is the hash of the last persisted snapshot. It is used to // detect if a new snapshot has to be written to disk. persistedHash []byte + + // baseLabels are used when emitting tagged metrics. All task runner metrics + // will have these tags, and optionally more. + baseLabels []metrics.Label } // taskRunnerState is used to snapshot the state of the task runner @@ -247,6 +251,8 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, signalCh: make(chan SignalEvent), } + tc.baseLabels = []metrics.Label{{"job", tc.alloc.Job.Name}, {"task_group", tc.alloc.TaskGroup}, {"alloc_id", tc.alloc.ID}, {"task", tc.task.Name}} + return tc } @@ -1786,10 +1792,25 @@ func (r *TaskRunner) setCreatedResources(cr *driver.CreatedResources) { r.createdResourcesLock.Unlock() } -// emitStats emits resource usage stats of tasks to remote metrics collector -// sinks -func (r *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) { - if ru.ResourceUsage.MemoryStats != nil && r.config.PublishAllocationMetrics { +func (r *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) { + if !r.config.DisableTaggedMetrics { + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"}, + float32(ru.ResourceUsage.MemoryStats.RSS), r.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"}, + float32(ru.ResourceUsage.MemoryStats.RSS), r.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "cache"}, + float32(ru.ResourceUsage.MemoryStats.Cache), r.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "swap"}, + float32(ru.ResourceUsage.MemoryStats.Swap), r.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "max_usage"}, + float32(ru.ResourceUsage.MemoryStats.MaxUsage), r.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_usage"}, + float32(ru.ResourceUsage.MemoryStats.KernelUsage), r.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_max_usage"}, + float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage), r.baseLabels) + } + + if r.config.BackwardsCompatibleMetrics { metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "memory", "rss"}, float32(ru.ResourceUsage.MemoryStats.RSS)) metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "memory", "cache"}, float32(ru.ResourceUsage.MemoryStats.Cache)) metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "memory", "swap"}, float32(ru.ResourceUsage.MemoryStats.Swap)) @@ -1797,8 +1818,25 @@ func (r *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) { metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "memory", "kernel_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelUsage)) metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "memory", "kernel_max_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage)) } +} - if ru.ResourceUsage.CpuStats != nil && r.config.PublishAllocationMetrics { +func (r *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) { + if !r.config.DisableTaggedMetrics { + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_percent"}, + float32(ru.ResourceUsage.CpuStats.Percent), r.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "system"}, + float32(ru.ResourceUsage.CpuStats.SystemMode), r.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "user"}, + float32(ru.ResourceUsage.CpuStats.UserMode), r.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_time"}, + float32(ru.ResourceUsage.CpuStats.ThrottledTime), r.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_periods"}, + float32(ru.ResourceUsage.CpuStats.ThrottledPeriods), r.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_ticks"}, + float32(ru.ResourceUsage.CpuStats.TotalTicks), r.baseLabels) + } + + if r.config.BackwardsCompatibleMetrics { metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "cpu", "total_percent"}, float32(ru.ResourceUsage.CpuStats.Percent)) metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "cpu", "system"}, float32(ru.ResourceUsage.CpuStats.SystemMode)) metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "cpu", "user"}, float32(ru.ResourceUsage.CpuStats.UserMode)) @@ -1807,3 +1845,19 @@ func (r *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) { metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "cpu", "total_ticks"}, float32(ru.ResourceUsage.CpuStats.TotalTicks)) } } + +// emitStats emits resource usage stats of tasks to remote metrics collector +// sinks +func (r *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) { + if !r.config.PublishAllocationMetrics { + return + } + + if ru.ResourceUsage.MemoryStats != nil { + r.setGaugeForMemory(ru) + } + + if ru.ResourceUsage.CpuStats != nil { + r.setGaugeForCPU(ru) + } +} diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 53607a601ba6..bbfc0956d9d3 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -2,7 +2,6 @@ package agent import ( "encoding/json" - "fmt" "io/ioutil" "log" "net" @@ -126,7 +125,6 @@ func TestAgent_ServerConfig(t *testing.T) { t.Fatalf("error normalizing config: %v", err) } out, err = a.serverConfig() - fmt.Println(conf.Addresses.RPC) if err != nil { t.Fatalf("err: %s", err) } diff --git a/command/agent/config-test-fixtures/basic.hcl b/command/agent/config-test-fixtures/basic.hcl index 459474b3c8a6..304da9b8e26e 100644 --- a/command/agent/config-test-fixtures/basic.hcl +++ b/command/agent/config-test-fixtures/basic.hcl @@ -60,6 +60,8 @@ client { gc_inode_usage_threshold = 91 gc_max_allocs = 50 no_host_uuid = false + disable_tagged_metrics = true + backwards_compatible_metrics = true } server { enabled = true diff --git a/command/agent/config.go b/command/agent/config.go index 36052df18a46..d7308094a67c 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -229,6 +229,14 @@ type ClientConfig struct { // NoHostUUID disables using the host's UUID and will force generation of a // random UUID. NoHostUUID *bool `mapstructure:"no_host_uuid"` + + // DisableTaggedMetrics disables a new version of generating metrics which + // uses tags + DisableTaggedMetrics bool `mapstructure:"disable_tagged_metrics"` + + // BackwardsCompatibleMetrics allows for generating metrics in a simple + // key/value structure as done in older versions of Nomad + BackwardsCompatibleMetrics bool `mapstructure:"backwards_compatible_metrics"` } // ACLConfig is configuration specific to the ACL system @@ -1097,6 +1105,14 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { result.NoHostUUID = b.NoHostUUID } + if b.DisableTaggedMetrics { + result.DisableTaggedMetrics = b.DisableTaggedMetrics + } + + if b.BackwardsCompatibleMetrics { + result.BackwardsCompatibleMetrics = b.BackwardsCompatibleMetrics + } + // Add the servers result.Servers = append(result.Servers, b.Servers...) diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 2894f79014a6..4928e48b2834 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -357,6 +357,8 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error { "gc_parallel_destroys", "gc_max_allocs", "no_host_uuid", + "disable_tagged_metrics", + "backwards_compatible_metrics", } if err := checkHCLKeys(listVal, valid); err != nil { return err diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index e088926a28b9..e2262ac36840 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -75,12 +75,14 @@ func TestConfig_Parse(t *testing.T) { ReservedPorts: "1,100,10-12", ParsedReservedPorts: []int{1, 10, 11, 12, 100}, }, - GCInterval: 6 * time.Second, - GCParallelDestroys: 6, - GCDiskUsageThreshold: 82, - GCInodeUsageThreshold: 91, - GCMaxAllocs: 50, - NoHostUUID: helper.BoolToPtr(false), + GCInterval: 6 * time.Second, + GCParallelDestroys: 6, + GCDiskUsageThreshold: 82, + GCInodeUsageThreshold: 91, + GCMaxAllocs: 50, + NoHostUUID: helper.BoolToPtr(false), + DisableTaggedMetrics: true, + BackwardsCompatibleMetrics: true, }, Server: &ServerConfig{ Enabled: true, diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 2e8d207106e5..286d174f34b2 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -89,6 +89,8 @@ func TestConfig_Merge(t *testing.T) { ReservedPorts: "1,10-30,55", ParsedReservedPorts: []int{1, 2, 4}, }, + DisableTaggedMetrics: true, + BackwardsCompatibleMetrics: true, }, Server: &ServerConfig{ Enabled: false, @@ -224,10 +226,12 @@ func TestConfig_Merge(t *testing.T) { ReservedPorts: "2,10-30,55", ParsedReservedPorts: []int{1, 2, 3}, }, - GCInterval: 6 * time.Second, - GCParallelDestroys: 6, - GCDiskUsageThreshold: 71, - GCInodeUsageThreshold: 86, + GCInterval: 6 * time.Second, + GCParallelDestroys: 6, + GCDiskUsageThreshold: 71, + GCInodeUsageThreshold: 86, + DisableTaggedMetrics: true, + BackwardsCompatibleMetrics: true, }, Server: &ServerConfig{ Enabled: true, diff --git a/vendor/github.com/DataDog/datadog-go/statsd/README.md b/vendor/github.com/DataDog/datadog-go/statsd/README.md index 2e897776338d..f68df54be9f6 100644 --- a/vendor/github.com/DataDog/datadog-go/statsd/README.md +++ b/vendor/github.com/DataDog/datadog-go/statsd/README.md @@ -33,6 +33,18 @@ err = c.Count("request.count_total", 2, nil, 1) DogStatsD accepts packets with multiple statsd payloads in them. Using the BufferingClient via `NewBufferingClient` will buffer up commands and send them when the buffer is reached or after 100msec. +## Unix Domain Sockets Client + +DogStatsD version 6 accepts packets through a Unix Socket datagram connection. You can use this protocol by giving a +`unix:///path/to/dsd.socket` addr argument to the `New` or `NewBufferingClient`. + +With this protocol, writes can become blocking if the server's receiving buffer is full. Our default behaviour is to +timeout and drop the packet after 1 ms. You can set a custom timeout duration via the `SetWriteTimeout` method. + +The default mode is to pass write errors from the socket to the caller. This includes write errors the library will +automatically recover from (DogStatsD server not ready yet or is restarting). You can drop these errors and emulate +the UDP behaviour by setting the `SkipErrors` property to `true`. Please note that packets will be dropped in both modes. + ## Development Run the tests with: diff --git a/vendor/github.com/DataDog/datadog-go/statsd/statsd.go b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go index d0fe122bb4ce..9c5773c96d72 100644 --- a/vendor/github.com/DataDog/datadog-go/statsd/statsd.go +++ b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go @@ -27,8 +27,8 @@ import ( "bytes" "errors" "fmt" + "io" "math/rand" - "net" "strconv" "strings" "sync" @@ -54,35 +54,71 @@ any number greater than that will see frames being cut out. */ const MaxUDPPayloadSize = 65467 -// A Client is a handle for sending udp messages to dogstatsd. It is safe to +/* +UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket +traffic instead of UDP. +*/ +const UnixAddressPrefix = "unix://" + +/* +Stat suffixes +*/ +var ( + gaugeSuffix = []byte("|g") + countSuffix = []byte("|c") + histogramSuffix = []byte("|h") + decrSuffix = []byte("-1|c") + incrSuffix = []byte("1|c") + setSuffix = []byte("|s") + timingSuffix = []byte("|ms") +) + +// A statsdWriter offers a standard interface regardless of the underlying +// protocol. For now UDS and UPD writers are available. +type statsdWriter interface { + Write(data []byte) error + SetWriteTimeout(time.Duration) error + Close() error +} + +// A Client is a handle for sending messages to dogstatsd. It is safe to // use one Client from multiple goroutines simultaneously. type Client struct { - conn net.Conn + // Writer handles the underlying networking protocol + writer statsdWriter // Namespace to prepend to all statsd calls Namespace string // Tags are global tags to be added to every statsd call Tags []string + // skipErrors turns off error passing and allows UDS to emulate UDP behaviour + SkipErrors bool // BufferLength is the length of the buffer in commands. bufferLength int flushTime time.Duration commands []string buffer bytes.Buffer - stop bool + stop chan struct{} sync.Mutex } -// New returns a pointer to a new Client given an addr in the format "hostname:port". +// New returns a pointer to a new Client given an addr in the format "hostname:port" or +// "unix:///path/to/socket". func New(addr string) (*Client, error) { - udpAddr, err := net.ResolveUDPAddr("udp", addr) - if err != nil { - return nil, err - } - conn, err := net.DialUDP("udp", nil, udpAddr) - if err != nil { - return nil, err + if strings.HasPrefix(addr, UnixAddressPrefix) { + w, err := newUdsWriter(addr[len(UnixAddressPrefix)-1:]) + if err != nil { + return nil, err + } + client := &Client{writer: w} + return client, nil + } else { + w, err := newUdpWriter(addr) + if err != nil { + return nil, err + } + client := &Client{writer: w, SkipErrors: false} + return client, nil } - client := &Client{conn: conn} - return client, nil } // NewBuffered returns a Client that buffers its output and sends it in chunks. @@ -95,56 +131,73 @@ func NewBuffered(addr string, buflen int) (*Client, error) { client.bufferLength = buflen client.commands = make([]string, 0, buflen) client.flushTime = time.Millisecond * 100 + client.stop = make(chan struct{}, 1) go client.watch() return client, nil } // format a message from its name, value, tags and rate. Also adds global // namespace and tags. -func (c *Client) format(name, value string, tags []string, rate float64) string { +func (c *Client) format(name string, value interface{}, suffix []byte, tags []string, rate float64) string { var buf bytes.Buffer if c.Namespace != "" { buf.WriteString(c.Namespace) } buf.WriteString(name) buf.WriteString(":") - buf.WriteString(value) + + switch val := value.(type) { + case float64: + buf.Write(strconv.AppendFloat([]byte{}, val, 'f', 6, 64)) + + case int64: + buf.Write(strconv.AppendInt([]byte{}, val, 10)) + + case string: + buf.WriteString(val) + + default: + // do nothing + } + buf.Write(suffix) + if rate < 1 { buf.WriteString(`|@`) buf.WriteString(strconv.FormatFloat(rate, 'f', -1, 64)) } - // do not append to c.Tags directly, because it's shared - // across all invocations of this function - tagCopy := make([]string, len(c.Tags), len(c.Tags)+len(tags)) - copy(tagCopy, c.Tags) - tags = append(tagCopy, tags...) - if len(tags) > 0 { - buf.WriteString("|#") - buf.WriteString(tags[0]) - for _, tag := range tags[1:] { - buf.WriteString(",") - buf.WriteString(tag) - } - } + writeTagString(&buf, c.Tags, tags) + return buf.String() } +// SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP. +func (c *Client) SetWriteTimeout(d time.Duration) error { + return c.writer.SetWriteTimeout(d) +} + func (c *Client) watch() { - for _ = range time.Tick(c.flushTime) { - if c.stop { + ticker := time.NewTicker(c.flushTime) + + for { + select { + case <-ticker.C: + c.Lock() + if len(c.commands) > 0 { + // FIXME: eating error here + c.flush() + } + c.Unlock() + case <-c.stop: + ticker.Stop() return } - c.Lock() - if len(c.commands) > 0 { - // FIXME: eating error here - c.flush() - } - c.Unlock() } } func (c *Client) append(cmd string) error { + c.Lock() + defer c.Unlock() c.commands = append(c.commands, cmd) // if we should flush, lets do it if len(c.commands) == c.bufferLength { @@ -208,7 +261,7 @@ func (c *Client) flush() error { var err error cmdsFlushed := 0 for i, data := range frames { - _, e := c.conn.Write(data) + e := c.writer.Write(data) if e != nil { err = e break @@ -228,64 +281,65 @@ func (c *Client) flush() error { } func (c *Client) sendMsg(msg string) error { + // return an error if message is bigger than MaxUDPPayloadSize + if len(msg) > MaxUDPPayloadSize { + return errors.New("message size exceeds MaxUDPPayloadSize") + } + // if this client is buffered, then we'll just append this - c.Lock() - defer c.Unlock() if c.bufferLength > 0 { - // return an error if message is bigger than OptimalPayloadSize - if len(msg) > MaxUDPPayloadSize { - return errors.New("message size exceeds MaxUDPPayloadSize") - } return c.append(msg) } - _, err := c.conn.Write([]byte(msg)) - return err + + err := c.writer.Write([]byte(msg)) + + if c.SkipErrors { + return nil + } else { + return err + } } // send handles sampling and sends the message over UDP. It also adds global namespace prefixes and tags. -func (c *Client) send(name, value string, tags []string, rate float64) error { +func (c *Client) send(name string, value interface{}, suffix []byte, tags []string, rate float64) error { if c == nil { return nil } if rate < 1 && rand.Float64() > rate { return nil } - data := c.format(name, value, tags, rate) + data := c.format(name, value, suffix, tags, rate) return c.sendMsg(data) } // Gauge measures the value of a metric at a particular time. func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error { - stat := fmt.Sprintf("%f|g", value) - return c.send(name, stat, tags, rate) + return c.send(name, value, gaugeSuffix, tags, rate) } // Count tracks how many times something happened per second. func (c *Client) Count(name string, value int64, tags []string, rate float64) error { - stat := fmt.Sprintf("%d|c", value) - return c.send(name, stat, tags, rate) + return c.send(name, value, countSuffix, tags, rate) } // Histogram tracks the statistical distribution of a set of values. func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error { - stat := fmt.Sprintf("%f|h", value) - return c.send(name, stat, tags, rate) + return c.send(name, value, histogramSuffix, tags, rate) } -// Decr is just Count of 1 +// Decr is just Count of -1 func (c *Client) Decr(name string, tags []string, rate float64) error { - return c.send(name, "-1|c", tags, rate) + return c.send(name, nil, decrSuffix, tags, rate) } // Incr is just Count of 1 func (c *Client) Incr(name string, tags []string, rate float64) error { - return c.send(name, "1|c", tags, rate) + return c.send(name, nil, incrSuffix, tags, rate) } // Set counts the number of unique elements in a group. func (c *Client) Set(name string, value string, tags []string, rate float64) error { - stat := fmt.Sprintf("%s|s", value) - return c.send(name, stat, tags, rate) + return c.send(name, value, setSuffix, tags, rate) } // Timing sends timing information, it is an alias for TimeInMilliseconds @@ -296,12 +350,14 @@ func (c *Client) Timing(name string, value time.Duration, tags []string, rate fl // TimeInMilliseconds sends timing information in milliseconds. // It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing) func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error { - stat := fmt.Sprintf("%f|ms", value) - return c.send(name, stat, tags, rate) + return c.send(name, value, timingSuffix, tags, rate) } // Event sends the provided Event. func (c *Client) Event(e *Event) error { + if c == nil { + return nil + } stat, err := e.Encode(c.Tags...) if err != nil { return err @@ -325,7 +381,7 @@ func (c *Client) ServiceCheck(sc *ServiceCheck) error { } // SimpleServiceCheck sends an serviceCheck with the provided name and status. -func (c *Client) SimpleServiceCheck(name string, status serviceCheckStatus) error { +func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) error { sc := NewServiceCheck(name, status) return c.ServiceCheck(sc) } @@ -335,8 +391,11 @@ func (c *Client) Close() error { if c == nil { return nil } - c.stop = true - return c.conn.Close() + select { + case c.stop <- struct{}{}: + default: + } + return c.writer.Close() } // Events support @@ -458,34 +517,24 @@ func (e Event) Encode(tags ...string) (string, error) { buffer.WriteString(string(e.AlertType)) } - if len(tags)+len(e.Tags) > 0 { - all := make([]string, 0, len(tags)+len(e.Tags)) - all = append(all, tags...) - all = append(all, e.Tags...) - buffer.WriteString("|#") - buffer.WriteString(all[0]) - for _, tag := range all[1:] { - buffer.WriteString(",") - buffer.WriteString(tag) - } - } + writeTagString(&buffer, tags, e.Tags) return buffer.String(), nil } // ServiceCheck support -type serviceCheckStatus byte +type ServiceCheckStatus byte const ( // Ok is the "ok" ServiceCheck status - Ok serviceCheckStatus = 0 + Ok ServiceCheckStatus = 0 // Warn is the "warning" ServiceCheck status - Warn serviceCheckStatus = 1 + Warn ServiceCheckStatus = 1 // Critical is the "critical" ServiceCheck status - Critical serviceCheckStatus = 2 + Critical ServiceCheckStatus = 2 // Unknown is the "unknown" ServiceCheck status - Unknown serviceCheckStatus = 3 + Unknown ServiceCheckStatus = 3 ) // An ServiceCheck is an object that contains status of DataDog service check. @@ -493,7 +542,7 @@ type ServiceCheck struct { // Name of the service check. Required. Name string // Status of service check. Required. - Status serviceCheckStatus + Status ServiceCheckStatus // Timestamp is a timestamp for the serviceCheck. If not provided, the dogstatsd // server will set this to the current time. Timestamp time.Time @@ -507,7 +556,7 @@ type ServiceCheck struct { // NewServiceCheck creates a new serviceCheck with the given name and status. Error checking // against these values is done at send-time, or upon running sc.Check. -func NewServiceCheck(name string, status serviceCheckStatus) *ServiceCheck { +func NewServiceCheck(name string, status ServiceCheckStatus) *ServiceCheck { return &ServiceCheck{ Name: name, Status: status, @@ -551,17 +600,7 @@ func (sc ServiceCheck) Encode(tags ...string) (string, error) { buffer.WriteString(sc.Hostname) } - if len(tags)+len(sc.Tags) > 0 { - all := make([]string, 0, len(tags)+len(sc.Tags)) - all = append(all, tags...) - all = append(all, sc.Tags...) - buffer.WriteString("|#") - buffer.WriteString(all[0]) - for _, tag := range all[1:] { - buffer.WriteString(",") - buffer.WriteString(tag) - } - } + writeTagString(&buffer, tags, sc.Tags) if len(message) != 0 { buffer.WriteString("|m:") @@ -579,3 +618,27 @@ func (sc ServiceCheck) escapedMessage() string { msg := strings.Replace(sc.Message, "\n", "\\n", -1) return strings.Replace(msg, "m:", `m\:`, -1) } + +func removeNewlines(str string) string { + return strings.Replace(str, "\n", "", -1) +} + +func writeTagString(w io.Writer, tagList1, tagList2 []string) { + // the tag lists may be shared with other callers, so we cannot modify + // them in any way (which means we cannot append to them either) + // therefore we must make an entirely separate copy just for this call + totalLen := len(tagList1) + len(tagList2) + if totalLen == 0 { + return + } + tags := make([]string, 0, totalLen) + tags = append(tags, tagList1...) + tags = append(tags, tagList2...) + + io.WriteString(w, "|#") + io.WriteString(w, removeNewlines(tags[0])) + for _, tag := range tags[1:] { + io.WriteString(w, ",") + io.WriteString(w, removeNewlines(tag)) + } +} diff --git a/vendor/github.com/DataDog/datadog-go/statsd/udp.go b/vendor/github.com/DataDog/datadog-go/statsd/udp.go new file mode 100644 index 000000000000..7cbb5148bf65 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/udp.go @@ -0,0 +1,41 @@ +package statsd + +import ( + "errors" + "net" + "time" +) + +// udpWriter is an internal class wrapping around management of UDP connection +type udpWriter struct { + conn net.Conn +} + +// New returns a pointer to a new udpWriter given an addr in the format "hostname:port". +func newUdpWriter(addr string) (*udpWriter, error) { + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, err + } + writer := &udpWriter{conn: conn} + return writer, nil +} + +// SetWriteTimeout is not needed for UDP, returns error +func (w *udpWriter) SetWriteTimeout(d time.Duration) error { + return errors.New("SetWriteTimeout: not supported for UDP connections") +} + +// Write data to the UDP connection with no error handling +func (w *udpWriter) Write(data []byte) error { + _, e := w.conn.Write(data) + return e +} + +func (w *udpWriter) Close() error { + return w.conn.Close() +} diff --git a/vendor/github.com/DataDog/datadog-go/statsd/uds.go b/vendor/github.com/DataDog/datadog-go/statsd/uds.go new file mode 100644 index 000000000000..fb8641a7cc7e --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/uds.go @@ -0,0 +1,66 @@ +package statsd + +import ( + "net" + "strings" + "time" +) + +/* +UDSTimeout holds the default timeout for UDS socket writes, as they can get +blocking when the receiving buffer is full. +*/ +const defaultUDSTimeout = 1 * time.Millisecond + +// udsWriter is an internal class wrapping around management of UDS connection +type udsWriter struct { + // Address to send metrics to, needed to allow reconnection on error + addr net.Addr + // Established connection object, or nil if not connected yet + conn net.Conn + // write timeout + writeTimeout time.Duration +} + +// New returns a pointer to a new udsWriter given a socket file path as addr. +func newUdsWriter(addr string) (*udsWriter, error) { + udsAddr, err := net.ResolveUnixAddr("unixgram", addr) + if err != nil { + return nil, err + } + // Defer connection to first Write + writer := &udsWriter{addr: udsAddr, conn: nil, writeTimeout: defaultUDSTimeout} + return writer, nil +} + +// SetWriteTimeout allows the user to set a custom write timeout +func (w *udsWriter) SetWriteTimeout(d time.Duration) error { + w.writeTimeout = d + return nil +} + +// Write data to the UDS connection with write timeout and minimal error handling: +// create the connection if nil, and destroy it if the statsd server has disconnected +func (w *udsWriter) Write(data []byte) error { + // Try connecting (first packet or connection lost) + if w.conn == nil { + conn, err := net.Dial(w.addr.Network(), w.addr.String()) + if err != nil { + return err + } else { + w.conn = conn + } + } + w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)) + _, e := w.conn.Write(data) + if e != nil && strings.Contains(e.Error(), "transport endpoint is not connected") { + // Statsd server disconnected, retry connecting at next packet + w.conn = nil + return e + } + return e +} + +func (w *udsWriter) Close() error { + return w.conn.Close() +} diff --git a/vendor/github.com/armon/go-metrics/.gitignore b/vendor/github.com/armon/go-metrics/.gitignore deleted file mode 100644 index 00268614f045..000000000000 --- a/vendor/github.com/armon/go-metrics/.gitignore +++ /dev/null @@ -1,22 +0,0 @@ -# Compiled Object files, Static and Dynamic libs (Shared Objects) -*.o -*.a -*.so - -# Folders -_obj -_test - -# Architecture specific extensions/prefixes -*.[568vq] -[568vq].out - -*.cgo1.go -*.cgo2.c -_cgo_defun.c -_cgo_gotypes.go -_cgo_export.* - -_testmain.go - -*.exe diff --git a/vendor/github.com/armon/go-metrics/README.md b/vendor/github.com/armon/go-metrics/README.md index 7b6f23e29f83..a7399cddffa4 100644 --- a/vendor/github.com/armon/go-metrics/README.md +++ b/vendor/github.com/armon/go-metrics/README.md @@ -28,39 +28,42 @@ Examples Here is an example of using the package: - func SlowMethod() { - // Profiling the runtime of a method - defer metrics.MeasureSince([]string{"SlowMethod"}, time.Now()) - } - - // Configure a statsite sink as the global metrics sink - sink, _ := metrics.NewStatsiteSink("statsite:8125") - metrics.NewGlobal(metrics.DefaultConfig("service-name"), sink) - - // Emit a Key/Value pair - metrics.EmitKey([]string{"questions", "meaning of life"}, 42) - - -Here is an example of setting up an signal handler: - - // Setup the inmem sink and signal handler - inm := metrics.NewInmemSink(10*time.Second, time.Minute) - sig := metrics.DefaultInmemSignal(inm) - metrics.NewGlobal(metrics.DefaultConfig("service-name"), inm) - - // Run some code - inm.SetGauge([]string{"foo"}, 42) - inm.EmitKey([]string{"bar"}, 30) - - inm.IncrCounter([]string{"baz"}, 42) - inm.IncrCounter([]string{"baz"}, 1) - inm.IncrCounter([]string{"baz"}, 80) - - inm.AddSample([]string{"method", "wow"}, 42) - inm.AddSample([]string{"method", "wow"}, 100) - inm.AddSample([]string{"method", "wow"}, 22) - - .... +```go +func SlowMethod() { + // Profiling the runtime of a method + defer metrics.MeasureSince([]string{"SlowMethod"}, time.Now()) +} + +// Configure a statsite sink as the global metrics sink +sink, _ := metrics.NewStatsiteSink("statsite:8125") +metrics.NewGlobal(metrics.DefaultConfig("service-name"), sink) + +// Emit a Key/Value pair +metrics.EmitKey([]string{"questions", "meaning of life"}, 42) +``` + +Here is an example of setting up a signal handler: + +```go +// Setup the inmem sink and signal handler +inm := metrics.NewInmemSink(10*time.Second, time.Minute) +sig := metrics.DefaultInmemSignal(inm) +metrics.NewGlobal(metrics.DefaultConfig("service-name"), inm) + +// Run some code +inm.SetGauge([]string{"foo"}, 42) +inm.EmitKey([]string{"bar"}, 30) + +inm.IncrCounter([]string{"baz"}, 42) +inm.IncrCounter([]string{"baz"}, 1) +inm.IncrCounter([]string{"baz"}, 80) + +inm.AddSample([]string{"method", "wow"}, 42) +inm.AddSample([]string{"method", "wow"}, 100) +inm.AddSample([]string{"method", "wow"}, 22) + +.... +``` When a signal comes in, output like the following will be dumped to stderr: diff --git a/vendor/github.com/armon/go-metrics/circonus/circonus.go b/vendor/github.com/armon/go-metrics/circonus/circonus.go index c6e3974b5dd4..eb41b9945514 100644 --- a/vendor/github.com/armon/go-metrics/circonus/circonus.go +++ b/vendor/github.com/armon/go-metrics/circonus/circonus.go @@ -5,6 +5,7 @@ package circonus import ( "strings" + "github.com/armon/go-metrics" cgm "github.com/circonus-labs/circonus-gometrics" ) @@ -61,6 +62,12 @@ func (s *CirconusSink) SetGauge(key []string, val float32) { s.metrics.SetGauge(flatKey, int64(val)) } +// SetGaugeWithLabels sets value for a gauge metric with the given labels +func (s *CirconusSink) SetGaugeWithLabels(key []string, val float32, labels []metrics.Label) { + flatKey := s.flattenKeyLabels(key, labels) + s.metrics.SetGauge(flatKey, int64(val)) +} + // EmitKey is not implemented in circonus func (s *CirconusSink) EmitKey(key []string, val float32) { // NOP @@ -72,12 +79,24 @@ func (s *CirconusSink) IncrCounter(key []string, val float32) { s.metrics.IncrementByValue(flatKey, uint64(val)) } +// IncrCounterWithLabels increments a counter metric with the given labels +func (s *CirconusSink) IncrCounterWithLabels(key []string, val float32, labels []metrics.Label) { + flatKey := s.flattenKeyLabels(key, labels) + s.metrics.IncrementByValue(flatKey, uint64(val)) +} + // AddSample adds a sample to a histogram metric func (s *CirconusSink) AddSample(key []string, val float32) { flatKey := s.flattenKey(key) s.metrics.RecordValue(flatKey, float64(val)) } +// AddSampleWithLabels adds a sample to a histogram metric with the given labels +func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []metrics.Label) { + flatKey := s.flattenKeyLabels(key, labels) + s.metrics.RecordValue(flatKey, float64(val)) +} + // Flattens key to Circonus metric name func (s *CirconusSink) flattenKey(parts []string) string { joined := strings.Join(parts, "`") @@ -90,3 +109,11 @@ func (s *CirconusSink) flattenKey(parts []string) string { } }, joined) } + +// Flattens the key along with labels for formatting, removes spaces +func (s *CirconusSink) flattenKeyLabels(parts []string, labels []metrics.Label) string { + for _, label := range labels { + parts = append(parts, label.Value) + } + return s.flattenKey(parts) +} diff --git a/vendor/github.com/armon/go-metrics/datadog/dogstatsd.go b/vendor/github.com/armon/go-metrics/datadog/dogstatsd.go index aaba9fe0e224..fe021d01c0fc 100644 --- a/vendor/github.com/armon/go-metrics/datadog/dogstatsd.go +++ b/vendor/github.com/armon/go-metrics/datadog/dogstatsd.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/DataDog/datadog-go/statsd" + "github.com/armon/go-metrics" ) // DogStatsdSink provides a MetricSink that can be used @@ -45,46 +46,49 @@ func (s *DogStatsdSink) EnableHostNamePropagation() { func (s *DogStatsdSink) flattenKey(parts []string) string { joined := strings.Join(parts, ".") - return strings.Map(func(r rune) rune { - switch r { - case ':': - fallthrough - case ' ': - return '_' - default: - return r - } - }, joined) + return strings.Map(sanitize, joined) +} + +func sanitize(r rune) rune { + switch r { + case ':': + fallthrough + case ' ': + return '_' + default: + return r + } } -func (s *DogStatsdSink) parseKey(key []string) ([]string, []string) { +func (s *DogStatsdSink) parseKey(key []string) ([]string, []metrics.Label) { // Since DogStatsd supports dimensionality via tags on metric keys, this sink's approach is to splice the hostname out of the key in favor of a `host` tag // The `host` tag is either forced here, or set downstream by the DogStatsd server - var tags []string + var labels []metrics.Label hostName := s.hostName - //Splice the hostname out of the key + // Splice the hostname out of the key for i, el := range key { if el == hostName { key = append(key[:i], key[i+1:]...) + break } } if s.propagateHostname { - tags = append(tags, fmt.Sprintf("host:%s", hostName)) + labels = append(labels, metrics.Label{"host", hostName}) } - return key, tags + return key, labels } // Implementation of methods in the MetricSink interface func (s *DogStatsdSink) SetGauge(key []string, val float32) { - s.SetGaugeWithTags(key, val, []string{}) + s.SetGaugeWithLabels(key, val, nil) } func (s *DogStatsdSink) IncrCounter(key []string, val float32) { - s.IncrCounterWithTags(key, val, []string{}) + s.IncrCounterWithLabels(key, val, nil) } // EmitKey is not implemented since DogStatsd does not provide a metric type that holds an @@ -93,33 +97,44 @@ func (s *DogStatsdSink) EmitKey(key []string, val float32) { } func (s *DogStatsdSink) AddSample(key []string, val float32) { - s.AddSampleWithTags(key, val, []string{}) + s.AddSampleWithLabels(key, val, nil) } -// The following ...WithTags methods correspond to Datadog's Tag extension to Statsd. +// The following ...WithLabels methods correspond to Datadog's Tag extension to Statsd. // http://docs.datadoghq.com/guides/dogstatsd/#tags - -func (s *DogStatsdSink) SetGaugeWithTags(key []string, val float32, tags []string) { - flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags) +func (s *DogStatsdSink) SetGaugeWithLabels(key []string, val float32, labels []metrics.Label) { + flatKey, tags := s.getFlatkeyAndCombinedLabels(key, labels) rate := 1.0 s.client.Gauge(flatKey, float64(val), tags, rate) } -func (s *DogStatsdSink) IncrCounterWithTags(key []string, val float32, tags []string) { - flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags) +func (s *DogStatsdSink) IncrCounterWithLabels(key []string, val float32, labels []metrics.Label) { + flatKey, tags := s.getFlatkeyAndCombinedLabels(key, labels) rate := 1.0 s.client.Count(flatKey, int64(val), tags, rate) } -func (s *DogStatsdSink) AddSampleWithTags(key []string, val float32, tags []string) { - flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags) +func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels []metrics.Label) { + flatKey, tags := s.getFlatkeyAndCombinedLabels(key, labels) rate := 1.0 s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate) } -func (s *DogStatsdSink) getFlatkeyAndCombinedTags(key []string, tags []string) (flattenedKey string, combinedTags []string) { - key, hostTags := s.parseKey(key) +func (s *DogStatsdSink) getFlatkeyAndCombinedLabels(key []string, labels []metrics.Label) (string, []string) { + key, parsedLabels := s.parseKey(key) flatKey := s.flattenKey(key) - tags = append(tags, hostTags...) + labels = append(labels, parsedLabels...) + + var tags []string + for _, label := range labels { + label.Name = strings.Map(sanitize, label.Name) + label.Value = strings.Map(sanitize, label.Value) + if label.Value != "" { + tags = append(tags, fmt.Sprintf("%s:%s", label.Name, label.Value)) + } else { + tags = append(tags, label.Name) + } + } + return flatKey, tags } diff --git a/vendor/github.com/armon/go-metrics/inmem.go b/vendor/github.com/armon/go-metrics/inmem.go index da503296060a..cd1773042136 100644 --- a/vendor/github.com/armon/go-metrics/inmem.go +++ b/vendor/github.com/armon/go-metrics/inmem.go @@ -1,8 +1,10 @@ package metrics import ( + "bytes" "fmt" "math" + "net/url" "strings" "sync" "time" @@ -25,6 +27,8 @@ type InmemSink struct { // intervals is a slice of the retained intervals intervals []*IntervalMetrics intervalLock sync.RWMutex + + rateDenom float64 } // IntervalMetrics stores the aggregated metrics @@ -36,7 +40,7 @@ type IntervalMetrics struct { Interval time.Time // Gauges maps the key to the last set value - Gauges map[string]float32 + Gauges map[string]GaugeValue // Points maps the string to the list of emitted values // from EmitKey @@ -44,21 +48,21 @@ type IntervalMetrics struct { // Counters maps the string key to a sum of the counter // values - Counters map[string]*AggregateSample + Counters map[string]SampledValue // Samples maps the key to an AggregateSample, // which has the rolled up view of a sample - Samples map[string]*AggregateSample + Samples map[string]SampledValue } // NewIntervalMetrics creates a new IntervalMetrics for a given interval func NewIntervalMetrics(intv time.Time) *IntervalMetrics { return &IntervalMetrics{ Interval: intv, - Gauges: make(map[string]float32), + Gauges: make(map[string]GaugeValue), Points: make(map[string][]float32), - Counters: make(map[string]*AggregateSample), - Samples: make(map[string]*AggregateSample), + Counters: make(map[string]SampledValue), + Samples: make(map[string]SampledValue), } } @@ -66,11 +70,12 @@ func NewIntervalMetrics(intv time.Time) *IntervalMetrics { // about a sample type AggregateSample struct { Count int // The count of emitted pairs + Rate float64 `json:"-"` // The count of emitted pairs per time unit (usually 1 second) Sum float64 // The sum of values - SumSq float64 // The sum of squared values + SumSq float64 `json:"-"` // The sum of squared values Min float64 // Minimum value Max float64 // Maximum value - LastUpdated time.Time // When value was last updated + LastUpdated time.Time `json:"-"` // When value was last updated } // Computes a Stddev of the values @@ -92,7 +97,7 @@ func (a *AggregateSample) Mean() float64 { } // Ingest is used to update a sample -func (a *AggregateSample) Ingest(v float64) { +func (a *AggregateSample) Ingest(v float64, rateDenom float64) { a.Count++ a.Sum += v a.SumSq += (v * v) @@ -102,6 +107,7 @@ func (a *AggregateSample) Ingest(v float64) { if v > a.Max || a.Count == 1 { a.Max = v } + a.Rate = float64(a.Count) / rateDenom a.LastUpdated = time.Now() } @@ -116,25 +122,49 @@ func (a *AggregateSample) String() string { } } +// NewInmemSinkFromURL creates an InmemSink from a URL. It is used +// (and tested) from NewMetricSinkFromURL. +func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) { + params := u.Query() + + interval, err := time.ParseDuration(params.Get("interval")) + if err != nil { + return nil, fmt.Errorf("Bad 'interval' param: %s", err) + } + + retain, err := time.ParseDuration(params.Get("retain")) + if err != nil { + return nil, fmt.Errorf("Bad 'retain' param: %s", err) + } + + return NewInmemSink(interval, retain), nil +} + // NewInmemSink is used to construct a new in-memory sink. // Uses an aggregation interval and maximum retention period. func NewInmemSink(interval, retain time.Duration) *InmemSink { + rateTimeUnit := time.Second i := &InmemSink{ interval: interval, retain: retain, maxIntervals: int(retain / interval), + rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()), } i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals) return i } func (i *InmemSink) SetGauge(key []string, val float32) { - k := i.flattenKey(key) + i.SetGaugeWithLabels(key, val, nil) +} + +func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) { + k, name := i.flattenKeyLabels(key, labels) intv := i.getInterval() intv.Lock() defer intv.Unlock() - intv.Gauges[k] = val + intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels} } func (i *InmemSink) EmitKey(key []string, val float32) { @@ -148,33 +178,49 @@ func (i *InmemSink) EmitKey(key []string, val float32) { } func (i *InmemSink) IncrCounter(key []string, val float32) { - k := i.flattenKey(key) + i.IncrCounterWithLabels(key, val, nil) +} + +func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) { + k, name := i.flattenKeyLabels(key, labels) intv := i.getInterval() intv.Lock() defer intv.Unlock() - agg := intv.Counters[k] - if agg == nil { - agg = &AggregateSample{} + agg, ok := intv.Counters[k] + if !ok { + agg = SampledValue{ + Name: name, + AggregateSample: &AggregateSample{}, + Labels: labels, + } intv.Counters[k] = agg } - agg.Ingest(float64(val)) + agg.Ingest(float64(val), i.rateDenom) } func (i *InmemSink) AddSample(key []string, val float32) { - k := i.flattenKey(key) + i.AddSampleWithLabels(key, val, nil) +} + +func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) { + k, name := i.flattenKeyLabels(key, labels) intv := i.getInterval() intv.Lock() defer intv.Unlock() - agg := intv.Samples[k] - if agg == nil { - agg = &AggregateSample{} + agg, ok := intv.Samples[k] + if !ok { + agg = SampledValue{ + Name: name, + AggregateSample: &AggregateSample{}, + Labels: labels, + } intv.Samples[k] = agg } - agg.Ingest(float64(val)) + agg.Ingest(float64(val), i.rateDenom) } // Data is used to retrieve all the aggregated metrics @@ -236,6 +282,38 @@ func (i *InmemSink) getInterval() *IntervalMetrics { // Flattens the key for formatting, removes spaces func (i *InmemSink) flattenKey(parts []string) string { - joined := strings.Join(parts, ".") - return strings.Replace(joined, " ", "_", -1) + buf := &bytes.Buffer{} + replacer := strings.NewReplacer(" ", "_") + + if len(parts) > 0 { + replacer.WriteString(buf, parts[0]) + } + for _, part := range parts[1:] { + replacer.WriteString(buf, ".") + replacer.WriteString(buf, part) + } + + return buf.String() +} + +// Flattens the key for formatting along with its labels, removes spaces +func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) { + buf := &bytes.Buffer{} + replacer := strings.NewReplacer(" ", "_") + + if len(parts) > 0 { + replacer.WriteString(buf, parts[0]) + } + for _, part := range parts[1:] { + replacer.WriteString(buf, ".") + replacer.WriteString(buf, part) + } + + key := buf.String() + + for _, label := range labels { + replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value)) + } + + return buf.String(), key } diff --git a/vendor/github.com/armon/go-metrics/inmem_endpoint.go b/vendor/github.com/armon/go-metrics/inmem_endpoint.go new file mode 100644 index 000000000000..504f1b374854 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/inmem_endpoint.go @@ -0,0 +1,118 @@ +package metrics + +import ( + "fmt" + "net/http" + "sort" + "time" +) + +// MetricsSummary holds a roll-up of metrics info for a given interval +type MetricsSummary struct { + Timestamp string + Gauges []GaugeValue + Points []PointValue + Counters []SampledValue + Samples []SampledValue +} + +type GaugeValue struct { + Name string + Hash string `json:"-"` + Value float32 + + Labels []Label `json:"-"` + DisplayLabels map[string]string `json:"Labels"` +} + +type PointValue struct { + Name string + Points []float32 +} + +type SampledValue struct { + Name string + Hash string `json:"-"` + *AggregateSample + Mean float64 + Stddev float64 + + Labels []Label `json:"-"` + DisplayLabels map[string]string `json:"Labels"` +} + +// DisplayMetrics returns a summary of the metrics from the most recent finished interval. +func (i *InmemSink) DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + data := i.Data() + + var interval *IntervalMetrics + n := len(data) + switch { + case n == 0: + return nil, fmt.Errorf("no metric intervals have been initialized yet") + case n == 1: + // Show the current interval if it's all we have + interval = i.intervals[0] + default: + // Show the most recent finished interval if we have one + interval = i.intervals[n-2] + } + + summary := MetricsSummary{ + Timestamp: interval.Interval.Round(time.Second).UTC().String(), + Gauges: make([]GaugeValue, 0, len(interval.Gauges)), + Points: make([]PointValue, 0, len(interval.Points)), + } + + // Format and sort the output of each metric type, so it gets displayed in a + // deterministic order. + for name, points := range interval.Points { + summary.Points = append(summary.Points, PointValue{name, points}) + } + sort.Slice(summary.Points, func(i, j int) bool { + return summary.Points[i].Name < summary.Points[j].Name + }) + + for hash, value := range interval.Gauges { + value.Hash = hash + value.DisplayLabels = make(map[string]string) + for _, label := range value.Labels { + value.DisplayLabels[label.Name] = label.Value + } + value.Labels = nil + + summary.Gauges = append(summary.Gauges, value) + } + sort.Slice(summary.Gauges, func(i, j int) bool { + return summary.Gauges[i].Hash < summary.Gauges[j].Hash + }) + + summary.Counters = formatSamples(interval.Counters) + summary.Samples = formatSamples(interval.Samples) + + return summary, nil +} + +func formatSamples(source map[string]SampledValue) []SampledValue { + output := make([]SampledValue, 0, len(source)) + for hash, sample := range source { + displayLabels := make(map[string]string) + for _, label := range sample.Labels { + displayLabels[label.Name] = label.Value + } + + output = append(output, SampledValue{ + Name: sample.Name, + Hash: hash, + AggregateSample: sample.AggregateSample, + Mean: sample.AggregateSample.Mean(), + Stddev: sample.AggregateSample.Stddev(), + DisplayLabels: displayLabels, + }) + } + sort.Slice(output, func(i, j int) bool { + return output[i].Hash < output[j].Hash + }) + + return output +} diff --git a/vendor/github.com/armon/go-metrics/inmem_signal.go b/vendor/github.com/armon/go-metrics/inmem_signal.go index 95d08ee10f0b..0937f4aedf7c 100644 --- a/vendor/github.com/armon/go-metrics/inmem_signal.go +++ b/vendor/github.com/armon/go-metrics/inmem_signal.go @@ -6,6 +6,7 @@ import ( "io" "os" "os/signal" + "strings" "sync" "syscall" ) @@ -75,22 +76,25 @@ func (i *InmemSignal) dumpStats() { data := i.inm.Data() // Skip the last period which is still being aggregated - for i := 0; i < len(data)-1; i++ { - intv := data[i] + for j := 0; j < len(data)-1; j++ { + intv := data[j] intv.RLock() - for name, val := range intv.Gauges { - fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val) + for _, val := range intv.Gauges { + name := i.flattenLabels(val.Name, val.Labels) + fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value) } for name, vals := range intv.Points { for _, val := range vals { fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val) } } - for name, agg := range intv.Counters { - fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg) + for _, agg := range intv.Counters { + name := i.flattenLabels(agg.Name, agg.Labels) + fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg.AggregateSample) } - for name, agg := range intv.Samples { - fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg) + for _, agg := range intv.Samples { + name := i.flattenLabels(agg.Name, agg.Labels) + fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg.AggregateSample) } intv.RUnlock() } @@ -98,3 +102,16 @@ func (i *InmemSignal) dumpStats() { // Write out the bytes i.w.Write(buf.Bytes()) } + +// Flattens the key for formatting along with its labels, removes spaces +func (i *InmemSignal) flattenLabels(name string, labels []Label) string { + buf := bytes.NewBufferString(name) + replacer := strings.NewReplacer(" ", "_", ":", "_") + + for _, label := range labels { + replacer.WriteString(buf, ".") + replacer.WriteString(buf, label.Value) + } + + return buf.String() +} diff --git a/vendor/github.com/armon/go-metrics/metrics.go b/vendor/github.com/armon/go-metrics/metrics.go index b818e4182c0c..d260bd4b29ec 100644 --- a/vendor/github.com/armon/go-metrics/metrics.go +++ b/vendor/github.com/armon/go-metrics/metrics.go @@ -2,20 +2,43 @@ package metrics import ( "runtime" + "strings" "time" + + "github.com/hashicorp/go-immutable-radix" ) +type Label struct { + Name string + Value string +} + func (m *Metrics) SetGauge(key []string, val float32) { - if m.HostName != "" && m.EnableHostname { - key = insert(0, m.HostName, key) + m.SetGaugeWithLabels(key, val, nil) +} + +func (m *Metrics) SetGaugeWithLabels(key []string, val float32, labels []Label) { + if m.HostName != "" { + if m.EnableHostnameLabel { + labels = append(labels, Label{"host", m.HostName}) + } else if m.EnableHostname { + key = insert(0, m.HostName, key) + } } if m.EnableTypePrefix { key = insert(0, "gauge", key) } if m.ServiceName != "" { - key = insert(0, m.ServiceName, key) + if m.EnableServiceLabel { + labels = append(labels, Label{"service", m.ServiceName}) + } else { + key = insert(0, m.ServiceName, key) + } + } + if !m.allowMetric(key) { + return } - m.sink.SetGauge(key, val) + m.sink.SetGaugeWithLabels(key, val, labels) } func (m *Metrics) EmitKey(key []string, val float32) { @@ -25,40 +48,118 @@ func (m *Metrics) EmitKey(key []string, val float32) { if m.ServiceName != "" { key = insert(0, m.ServiceName, key) } + if !m.allowMetric(key) { + return + } m.sink.EmitKey(key, val) } func (m *Metrics) IncrCounter(key []string, val float32) { + m.IncrCounterWithLabels(key, val, nil) +} + +func (m *Metrics) IncrCounterWithLabels(key []string, val float32, labels []Label) { + if m.HostName != "" && m.EnableHostnameLabel { + labels = append(labels, Label{"host", m.HostName}) + } if m.EnableTypePrefix { key = insert(0, "counter", key) } if m.ServiceName != "" { - key = insert(0, m.ServiceName, key) + if m.EnableServiceLabel { + labels = append(labels, Label{"service", m.ServiceName}) + } else { + key = insert(0, m.ServiceName, key) + } } - m.sink.IncrCounter(key, val) + if !m.allowMetric(key) { + return + } + m.sink.IncrCounterWithLabels(key, val, labels) } func (m *Metrics) AddSample(key []string, val float32) { + m.AddSampleWithLabels(key, val, nil) +} + +func (m *Metrics) AddSampleWithLabels(key []string, val float32, labels []Label) { + if m.HostName != "" && m.EnableHostnameLabel { + labels = append(labels, Label{"host", m.HostName}) + } if m.EnableTypePrefix { key = insert(0, "sample", key) } if m.ServiceName != "" { - key = insert(0, m.ServiceName, key) + if m.EnableServiceLabel { + labels = append(labels, Label{"service", m.ServiceName}) + } else { + key = insert(0, m.ServiceName, key) + } + } + if !m.allowMetric(key) { + return } - m.sink.AddSample(key, val) + m.sink.AddSampleWithLabels(key, val, labels) } func (m *Metrics) MeasureSince(key []string, start time.Time) { + m.MeasureSinceWithLabels(key, start, nil) +} + +func (m *Metrics) MeasureSinceWithLabels(key []string, start time.Time, labels []Label) { + if m.HostName != "" && m.EnableHostnameLabel { + labels = append(labels, Label{"host", m.HostName}) + } if m.EnableTypePrefix { key = insert(0, "timer", key) } if m.ServiceName != "" { - key = insert(0, m.ServiceName, key) + if m.EnableServiceLabel { + labels = append(labels, Label{"service", m.ServiceName}) + } else { + key = insert(0, m.ServiceName, key) + } + } + if !m.allowMetric(key) { + return } now := time.Now() elapsed := now.Sub(start) msec := float32(elapsed.Nanoseconds()) / float32(m.TimerGranularity) - m.sink.AddSample(key, msec) + m.sink.AddSampleWithLabels(key, msec, labels) +} + +// UpdateFilter overwrites the existing filter with the given rules. +func (m *Metrics) UpdateFilter(allow, block []string) { + m.filterLock.Lock() + defer m.filterLock.Unlock() + + m.AllowedPrefixes = allow + m.BlockedPrefixes = block + + m.filter = iradix.New() + for _, prefix := range m.AllowedPrefixes { + m.filter, _, _ = m.filter.Insert([]byte(prefix), true) + } + for _, prefix := range m.BlockedPrefixes { + m.filter, _, _ = m.filter.Insert([]byte(prefix), false) + } +} + +// Returns whether the metric should be allowed based on configured prefix filters +func (m *Metrics) allowMetric(key []string) bool { + m.filterLock.RLock() + defer m.filterLock.RUnlock() + + if m.filter == nil || m.filter.Len() == 0 { + return m.Config.FilterDefault + } + + _, allowed, ok := m.filter.Root().LongestPrefix([]byte(strings.Join(key, "."))) + if !ok { + return m.Config.FilterDefault + } + return allowed.(bool) } // Periodically collects runtime stats to publish diff --git a/vendor/github.com/armon/go-metrics/sink.go b/vendor/github.com/armon/go-metrics/sink.go index 0c240c2c47ee..0b7d6e4be43f 100644 --- a/vendor/github.com/armon/go-metrics/sink.go +++ b/vendor/github.com/armon/go-metrics/sink.go @@ -1,35 +1,50 @@ package metrics +import ( + "fmt" + "net/url" +) + // The MetricSink interface is used to transmit metrics information // to an external system type MetricSink interface { // A Gauge should retain the last value it is set to SetGauge(key []string, val float32) + SetGaugeWithLabels(key []string, val float32, labels []Label) // Should emit a Key/Value pair for each call EmitKey(key []string, val float32) // Counters should accumulate values IncrCounter(key []string, val float32) + IncrCounterWithLabels(key []string, val float32, labels []Label) // Samples are for timing information, where quantiles are used AddSample(key []string, val float32) + AddSampleWithLabels(key []string, val float32, labels []Label) } // BlackholeSink is used to just blackhole messages type BlackholeSink struct{} -func (*BlackholeSink) SetGauge(key []string, val float32) {} -func (*BlackholeSink) EmitKey(key []string, val float32) {} -func (*BlackholeSink) IncrCounter(key []string, val float32) {} -func (*BlackholeSink) AddSample(key []string, val float32) {} +func (*BlackholeSink) SetGauge(key []string, val float32) {} +func (*BlackholeSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {} +func (*BlackholeSink) EmitKey(key []string, val float32) {} +func (*BlackholeSink) IncrCounter(key []string, val float32) {} +func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {} +func (*BlackholeSink) AddSample(key []string, val float32) {} +func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {} // FanoutSink is used to sink to fanout values to multiple sinks type FanoutSink []MetricSink func (fh FanoutSink) SetGauge(key []string, val float32) { + fh.SetGaugeWithLabels(key, val, nil) +} + +func (fh FanoutSink) SetGaugeWithLabels(key []string, val float32, labels []Label) { for _, s := range fh { - s.SetGauge(key, val) + s.SetGaugeWithLabels(key, val, labels) } } @@ -40,13 +55,61 @@ func (fh FanoutSink) EmitKey(key []string, val float32) { } func (fh FanoutSink) IncrCounter(key []string, val float32) { + fh.IncrCounterWithLabels(key, val, nil) +} + +func (fh FanoutSink) IncrCounterWithLabels(key []string, val float32, labels []Label) { for _, s := range fh { - s.IncrCounter(key, val) + s.IncrCounterWithLabels(key, val, labels) } } func (fh FanoutSink) AddSample(key []string, val float32) { + fh.AddSampleWithLabels(key, val, nil) +} + +func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Label) { for _, s := range fh { - s.AddSample(key, val) + s.AddSampleWithLabels(key, val, labels) + } +} + +// sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided +// by each sink type +type sinkURLFactoryFunc func(*url.URL) (MetricSink, error) + +// sinkRegistry supports the generic NewMetricSink function by mapping URL +// schemes to metric sink factory functions +var sinkRegistry = map[string]sinkURLFactoryFunc{ + "statsd": NewStatsdSinkFromURL, + "statsite": NewStatsiteSinkFromURL, + "inmem": NewInmemSinkFromURL, +} + +// NewMetricSinkFromURL allows a generic URL input to configure any of the +// supported sinks. The scheme of the URL identifies the type of the sink, the +// and query parameters are used to set options. +// +// "statsd://" - Initializes a StatsdSink. The host and port are passed through +// as the "addr" of the sink +// +// "statsite://" - Initializes a StatsiteSink. The host and port become the +// "addr" of the sink +// +// "inmem://" - Initializes an InmemSink. The host and port are ignored. The +// "interval" and "duration" query parameters must be specified with valid +// durations, see NewInmemSink for details. +func NewMetricSinkFromURL(urlStr string) (MetricSink, error) { + u, err := url.Parse(urlStr) + if err != nil { + return nil, err + } + + sinkURLFactoryFunc := sinkRegistry[u.Scheme] + if sinkURLFactoryFunc == nil { + return nil, fmt.Errorf( + "cannot create metric sink, unrecognized sink name: %q", u.Scheme) } + + return sinkURLFactoryFunc(u) } diff --git a/vendor/github.com/armon/go-metrics/start.go b/vendor/github.com/armon/go-metrics/start.go index 44113f100426..46f0c2eb2db4 100644 --- a/vendor/github.com/armon/go-metrics/start.go +++ b/vendor/github.com/armon/go-metrics/start.go @@ -2,7 +2,11 @@ package metrics import ( "os" + "sync" + "sync/atomic" "time" + + "github.com/hashicorp/go-immutable-radix" ) // Config is used to configure metrics settings @@ -10,26 +14,34 @@ type Config struct { ServiceName string // Prefixed with keys to seperate services HostName string // Hostname to use. If not provided and EnableHostname, it will be os.Hostname EnableHostname bool // Enable prefixing gauge values with hostname + EnableHostnameLabel bool // Enable adding hostname to labels + EnableServiceLabel bool // Enable adding service to labels EnableRuntimeMetrics bool // Enables profiling of runtime metrics (GC, Goroutines, Memory) EnableTypePrefix bool // Prefixes key with a type ("counter", "gauge", "timer") TimerGranularity time.Duration // Granularity of timers. ProfileInterval time.Duration // Interval to profile runtime metrics + + AllowedPrefixes []string // A list of metric prefixes to allow, with '.' as the separator + BlockedPrefixes []string // A list of metric prefixes to block, with '.' as the separator + FilterDefault bool // Whether to allow metrics by default } // Metrics represents an instance of a metrics sink that can // be used to emit type Metrics struct { Config - lastNumGC uint32 - sink MetricSink + lastNumGC uint32 + sink MetricSink + filter *iradix.Tree + filterLock sync.RWMutex } // Shared global metrics instance -var globalMetrics *Metrics +var globalMetrics atomic.Value // *Metrics func init() { // Initialize to a blackhole sink to avoid errors - globalMetrics = &Metrics{sink: &BlackholeSink{}} + globalMetrics.Store(&Metrics{sink: &BlackholeSink{}}) } // DefaultConfig provides a sane default configuration @@ -42,6 +54,7 @@ func DefaultConfig(serviceName string) *Config { EnableTypePrefix: false, // Disable type prefix TimerGranularity: time.Millisecond, // Timers are in milliseconds ProfileInterval: time.Second, // Poll runtime every second + FilterDefault: true, // Don't filter metrics by default } // Try to get the hostname @@ -55,6 +68,7 @@ func New(conf *Config, sink MetricSink) (*Metrics, error) { met := &Metrics{} met.Config = *conf met.sink = sink + met.UpdateFilter(conf.AllowedPrefixes, conf.BlockedPrefixes) // Start the runtime collector if conf.EnableRuntimeMetrics { @@ -68,28 +82,48 @@ func New(conf *Config, sink MetricSink) (*Metrics, error) { func NewGlobal(conf *Config, sink MetricSink) (*Metrics, error) { metrics, err := New(conf, sink) if err == nil { - globalMetrics = metrics + globalMetrics.Store(metrics) } return metrics, err } // Proxy all the methods to the globalMetrics instance func SetGauge(key []string, val float32) { - globalMetrics.SetGauge(key, val) + globalMetrics.Load().(*Metrics).SetGauge(key, val) +} + +func SetGaugeWithLabels(key []string, val float32, labels []Label) { + globalMetrics.Load().(*Metrics).SetGaugeWithLabels(key, val, labels) } func EmitKey(key []string, val float32) { - globalMetrics.EmitKey(key, val) + globalMetrics.Load().(*Metrics).EmitKey(key, val) } func IncrCounter(key []string, val float32) { - globalMetrics.IncrCounter(key, val) + globalMetrics.Load().(*Metrics).IncrCounter(key, val) +} + +func IncrCounterWithLabels(key []string, val float32, labels []Label) { + globalMetrics.Load().(*Metrics).IncrCounterWithLabels(key, val, labels) } func AddSample(key []string, val float32) { - globalMetrics.AddSample(key, val) + globalMetrics.Load().(*Metrics).AddSample(key, val) +} + +func AddSampleWithLabels(key []string, val float32, labels []Label) { + globalMetrics.Load().(*Metrics).AddSampleWithLabels(key, val, labels) } func MeasureSince(key []string, start time.Time) { - globalMetrics.MeasureSince(key, start) + globalMetrics.Load().(*Metrics).MeasureSince(key, start) +} + +func MeasureSinceWithLabels(key []string, start time.Time, labels []Label) { + globalMetrics.Load().(*Metrics).MeasureSinceWithLabels(key, start, labels) +} + +func UpdateFilter(allow, block []string) { + globalMetrics.Load().(*Metrics).UpdateFilter(allow, block) } diff --git a/vendor/github.com/armon/go-metrics/statsd.go b/vendor/github.com/armon/go-metrics/statsd.go index 65a5021a0574..1bfffce46e21 100644 --- a/vendor/github.com/armon/go-metrics/statsd.go +++ b/vendor/github.com/armon/go-metrics/statsd.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "net" + "net/url" "strings" "time" ) @@ -23,6 +24,12 @@ type StatsdSink struct { metricQueue chan string } +// NewStatsdSinkFromURL creates an StatsdSink from a URL. It is used +// (and tested) from NewMetricSinkFromURL. +func NewStatsdSinkFromURL(u *url.URL) (MetricSink, error) { + return NewStatsdSink(u.Host) +} + // NewStatsdSink is used to create a new StatsdSink func NewStatsdSink(addr string) (*StatsdSink, error) { s := &StatsdSink{ @@ -43,6 +50,11 @@ func (s *StatsdSink) SetGauge(key []string, val float32) { s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val)) } +func (s *StatsdSink) SetGaugeWithLabels(key []string, val float32, labels []Label) { + flatKey := s.flattenKeyLabels(key, labels) + s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val)) +} + func (s *StatsdSink) EmitKey(key []string, val float32) { flatKey := s.flattenKey(key) s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val)) @@ -53,11 +65,21 @@ func (s *StatsdSink) IncrCounter(key []string, val float32) { s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val)) } +func (s *StatsdSink) IncrCounterWithLabels(key []string, val float32, labels []Label) { + flatKey := s.flattenKeyLabels(key, labels) + s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val)) +} + func (s *StatsdSink) AddSample(key []string, val float32) { flatKey := s.flattenKey(key) s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val)) } +func (s *StatsdSink) AddSampleWithLabels(key []string, val float32, labels []Label) { + flatKey := s.flattenKeyLabels(key, labels) + s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val)) +} + // Flattens the key for formatting, removes spaces func (s *StatsdSink) flattenKey(parts []string) string { joined := strings.Join(parts, ".") @@ -73,6 +95,14 @@ func (s *StatsdSink) flattenKey(parts []string) string { }, joined) } +// Flattens the key along with labels for formatting, removes spaces +func (s *StatsdSink) flattenKeyLabels(parts []string, labels []Label) string { + for _, label := range labels { + parts = append(parts, label.Value) + } + return s.flattenKey(parts) +} + // Does a non-blocking push to the metrics queue func (s *StatsdSink) pushMetric(m string) { select { diff --git a/vendor/github.com/armon/go-metrics/statsite.go b/vendor/github.com/armon/go-metrics/statsite.go index 68730139a73a..6c0d284d2ddb 100644 --- a/vendor/github.com/armon/go-metrics/statsite.go +++ b/vendor/github.com/armon/go-metrics/statsite.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "net" + "net/url" "strings" "time" ) @@ -16,6 +17,12 @@ const ( flushInterval = 100 * time.Millisecond ) +// NewStatsiteSinkFromURL creates an StatsiteSink from a URL. It is used +// (and tested) from NewMetricSinkFromURL. +func NewStatsiteSinkFromURL(u *url.URL) (MetricSink, error) { + return NewStatsiteSink(u.Host) +} + // StatsiteSink provides a MetricSink that can be used with a // statsite metrics server type StatsiteSink struct { @@ -43,6 +50,11 @@ func (s *StatsiteSink) SetGauge(key []string, val float32) { s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val)) } +func (s *StatsiteSink) SetGaugeWithLabels(key []string, val float32, labels []Label) { + flatKey := s.flattenKeyLabels(key, labels) + s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val)) +} + func (s *StatsiteSink) EmitKey(key []string, val float32) { flatKey := s.flattenKey(key) s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val)) @@ -53,11 +65,21 @@ func (s *StatsiteSink) IncrCounter(key []string, val float32) { s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val)) } +func (s *StatsiteSink) IncrCounterWithLabels(key []string, val float32, labels []Label) { + flatKey := s.flattenKeyLabels(key, labels) + s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val)) +} + func (s *StatsiteSink) AddSample(key []string, val float32) { flatKey := s.flattenKey(key) s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val)) } +func (s *StatsiteSink) AddSampleWithLabels(key []string, val float32, labels []Label) { + flatKey := s.flattenKeyLabels(key, labels) + s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val)) +} + // Flattens the key for formatting, removes spaces func (s *StatsiteSink) flattenKey(parts []string) string { joined := strings.Join(parts, ".") @@ -73,6 +95,14 @@ func (s *StatsiteSink) flattenKey(parts []string) string { }, joined) } +// Flattens the key along with labels for formatting, removes spaces +func (s *StatsiteSink) flattenKeyLabels(parts []string, labels []Label) string { + for _, label := range labels { + parts = append(parts, label.Value) + } + return s.flattenKey(parts) +} + // Does a non-blocking push to the metrics queue func (s *StatsiteSink) pushMetric(m string) { select { diff --git a/vendor/vendor.json b/vendor/vendor.json index 1a38adc04ffc..efec0e9d8add 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -19,10 +19,10 @@ "revisionTime": "2016-06-22T17:32:16Z" }, { - "checksumSHA1": "CMcHvqld6XBCDYA+9jCy6gT1O0Q=", + "checksumSHA1": "WvApwvvSe3i/3KO8300dyeFmkbI=", "path": "github.com/DataDog/datadog-go/statsd", - "revision": "909c02b65dd8a52e8fa6072db9752a112227cf21", - "revisionTime": "2016-08-22T16:14:30Z" + "revision": "b10af4b12965a1ad08d164f57d14195b4140d8de", + "revisionTime": "2017-08-09T10:47:06Z" }, { "checksumSHA1": "AzjRkOQtVBTwIw4RJLTygFhJs3s=", @@ -64,20 +64,27 @@ "revision": "bbbad097214e2918d8543d5201d12bfd7bca254d" }, { + "checksumSHA1": "0et4hA6AYqZCgYiY+c6Z17t3k3k=", "path": "github.com/armon/go-metrics", - "revision": "06b60999766278efd6d2b5d8418a58c3d5b99e87" + "revision": "023a4bbe4bb9bfb23ee7e1afc8d0abad217641f3", + "revisionTime": "2017-08-09T01:16:44Z" }, { - "checksumSHA1": "OmqT9Y1mAHvlAKeJh0jBHC9SH78=", + "origin": "github.com/armon", + "path": "github.com/armon/go-metrics/..", + "revision": "" + }, + { + "checksumSHA1": "xCsGGM9TKBogZDfSN536KtQdLko=", "path": "github.com/armon/go-metrics/circonus", - "revision": "3df31a1ada83e310c2e24b267c8e8b68836547b4", - "revisionTime": "2016-07-17T04:34:58Z" + "revision": "023a4bbe4bb9bfb23ee7e1afc8d0abad217641f3", + "revisionTime": "2017-08-09T01:16:44Z" }, { - "checksumSHA1": "mAzNU3zeZGEwqjDT4ZkspFvx3TI=", + "checksumSHA1": "Dt0n1sSivvvdZQdzc4Hu/yOG+T0=", "path": "github.com/armon/go-metrics/datadog", - "revision": "3df31a1ada83e310c2e24b267c8e8b68836547b4", - "revisionTime": "2016-07-17T04:34:58Z" + "revision": "023a4bbe4bb9bfb23ee7e1afc8d0abad217641f3", + "revisionTime": "2017-08-09T01:16:44Z" }, { "checksumSHA1": "gNO0JNpLzYOdInGeq7HqMZUzx9M=", @@ -366,26 +373,26 @@ { "checksumSHA1": "iP5slJJPRZUm0rfdII8OiATAACA=", "path": "github.com/docker/docker/pkg/idtools", - "revision": "52debcd58ac91bf68503ce60561536911b74ff05", - "revisionTime": "2016-05-20T15:17:10Z" + "revision": "02caa73df411debed164f520a6a1304778f8b88c", + "revisionTime": "2016-05-28T10:48:36Z" }, { "checksumSHA1": "iP5slJJPRZUm0rfdII8OiATAACA=", "path": "github.com/docker/docker/pkg/idtools", - "revision": "02caa73df411debed164f520a6a1304778f8b88c", - "revisionTime": "2016-05-28T10:48:36Z" + "revision": "52debcd58ac91bf68503ce60561536911b74ff05", + "revisionTime": "2016-05-20T15:17:10Z" }, { "checksumSHA1": "tdhmIGUaoOMEDymMC23qTS7bt0g=", "path": "github.com/docker/docker/pkg/ioutils", - "revision": "da39e9a4f920a15683dd0f23923c302d4db6eed5", - "revisionTime": "2016-05-28T08:11:04Z" + "revision": "52debcd58ac91bf68503ce60561536911b74ff05", + "revisionTime": "2016-05-20T15:17:10Z" }, { "checksumSHA1": "tdhmIGUaoOMEDymMC23qTS7bt0g=", "path": "github.com/docker/docker/pkg/ioutils", - "revision": "52debcd58ac91bf68503ce60561536911b74ff05", - "revisionTime": "2016-05-20T15:17:10Z" + "revision": "da39e9a4f920a15683dd0f23923c302d4db6eed5", + "revisionTime": "2016-05-28T08:11:04Z" }, { "checksumSHA1": "BlFSSK7zUjPzPuxkLmM/0wpvku8=",