Skip to content

Commit

Permalink
Add allocated/unallocated metrics to client
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Feb 17, 2017
1 parent 11e9a10 commit dca06bd
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 7 deletions.
121 changes: 114 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ type Client struct {
blockedAllocations map[string]*structs.Allocation
blockedAllocsLock sync.RWMutex

// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]*migrateAllocCtrl
migratingAllocsLock sync.Mutex

// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation

Expand All @@ -151,25 +155,23 @@ type Client struct {
// vaultClient is used to interact with Vault for token and secret renewals
vaultClient vaultclient.VaultClient

// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]*migrateAllocCtrl
migratingAllocsLock sync.Mutex

// garbageCollector is used to garbage collect terminal allocations present
// in the node automatically
garbageCollector *AllocGarbageCollector
}

// migrateAllocCtrl indicates whether migration is complete
type migrateAllocCtrl struct {
alloc *structs.Allocation
ch chan struct{}
closed bool
chLock sync.Mutex
}

func newMigrateAllocCtrl() *migrateAllocCtrl {
func newMigrateAllocCtrl(alloc *structs.Allocation) *migrateAllocCtrl {
return &migrateAllocCtrl{
ch: make(chan struct{}),
ch: make(chan struct{}),
alloc: alloc,
}
}

Expand Down Expand Up @@ -1501,7 +1503,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// prevents a race between a finishing blockForRemoteAlloc and
// another invocation of runAllocs
if _, ok := c.getAllocRunners()[add.PreviousAllocation]; !ok {
c.migratingAllocs[add.ID] = newMigrateAllocCtrl()
c.migratingAllocs[add.ID] = newMigrateAllocCtrl(add)
go c.blockForRemoteAlloc(add)
}
}
Expand Down Expand Up @@ -2220,6 +2222,111 @@ func (c *Client) emitStats(hStats *stats.HostStats) {
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used_percent"}, float32(disk.UsedPercent))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "inodes_percent"}, float32(disk.InodesUsedPercent))
}

// Get all the resources for the node
c.configLock.RLock()
node := c.configCopy.Node
c.configLock.RUnlock()
total := node.Resources
res := node.Reserved
allocated := c.getAllocatedResources(node)

// Emit allocated
metrics.SetGauge([]string{"client", "allocated", "memory", nodeID}, float32(allocated.MemoryMB))
metrics.SetGauge([]string{"client", "allocated", "disk", nodeID}, float32(allocated.DiskMB))
metrics.SetGauge([]string{"client", "allocated", "cpu", nodeID}, float32(allocated.CPU))
metrics.SetGauge([]string{"client", "allocated", "iops", nodeID}, float32(allocated.IOPS))

for _, n := range allocated.Networks {
metrics.SetGauge([]string{"client", "allocated", "network", n.Device, nodeID}, float32(n.MBits))
}

// Emit unallocated
unallocatedMem := total.MemoryMB - res.MemoryMB - allocated.MemoryMB
unallocatedDisk := total.DiskMB - res.DiskMB - allocated.DiskMB
unallocatedCpu := total.CPU - res.CPU - allocated.CPU
unallocatedIops := total.IOPS - res.IOPS - allocated.IOPS
metrics.SetGauge([]string{"client", "unallocated", "memory", nodeID}, float32(unallocatedMem))
metrics.SetGauge([]string{"client", "unallocated", "disk", nodeID}, float32(unallocatedDisk))
metrics.SetGauge([]string{"client", "unallocated", "cpu", nodeID}, float32(unallocatedCpu))
metrics.SetGauge([]string{"client", "unallocated", "iops", nodeID}, float32(unallocatedIops))

for _, n := range allocated.Networks {
totalMbits := 0

totalIdx := total.NetIndex(n)
if totalIdx != -1 {
totalMbits = total.Networks[totalIdx].MBits
continue
}

unallocatedMbits := totalMbits - n.MBits
metrics.SetGauge([]string{"client", "unallocated", "network", n.Device, nodeID}, float32(unallocatedMbits))
}
}

func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resources {
// Unfortunately the allocs only have IP so we need to match them to the
// device
cidrToDevice := make(map[*net.IPNet]string, len(selfNode.Resources.Networks))
for _, n := range selfNode.Resources.Networks {
_, ipnet, err := net.ParseCIDR(n.CIDR)
if err != nil {
continue
}
cidrToDevice[ipnet] = n.Device
}

// Sum the allocated resources
allocs := c.allAllocs()
var allocated structs.Resources
allocatedDeviceMbits := make(map[string]int)
for _, alloc := range allocs {
if !alloc.TerminalStatus() {
allocated.Add(alloc.Resources)
for _, allocatedNetwork := range alloc.Resources.Networks {
for cidr, dev := range cidrToDevice {
ip := net.ParseIP(allocatedNetwork.IP)
if cidr.Contains(ip) {
allocatedDeviceMbits[dev] += allocatedNetwork.MBits
break
}
}
}
}
}

// Clear the networks
allocated.Networks = nil
for dev, speed := range allocatedDeviceMbits {
net := &structs.NetworkResource{
Device: dev,
MBits: speed,
}
allocated.Networks = append(allocated.Networks, net)
}

return &allocated
}

// allAllocs returns all the allocations managed by the client
func (c *Client) allAllocs() []*structs.Allocation {
var allocs []*structs.Allocation
for _, ar := range c.getAllocRunners() {
allocs = append(allocs, ar.Alloc())
}
c.blockedAllocsLock.Lock()
for _, alloc := range c.blockedAllocations {
allocs = append(allocs, alloc)
}
c.blockedAllocsLock.Unlock()

c.migratingAllocsLock.Lock()
for _, ctrl := range c.migratingAllocs {
allocs = append(allocs, ctrl.alloc)
}
c.migratingAllocsLock.Unlock()
return allocs
}

// resolveServer given a sever's address as a string, return it's resolved
Expand Down
7 changes: 7 additions & 0 deletions demo/vagrant/client1.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@ client {
}
reserved {
cpu = 500
memory = 512
disk = 1024
}
}

telemetry {
publish_allocation_metrics = true
publish_node_metrics = true
}

# Modify our port to avoid a collision with server1
ports {
http = 5656
Expand Down

0 comments on commit dca06bd

Please sign in to comment.