Skip to content

Commit

Permalink
Iops (#101)
Browse files Browse the repository at this point in the history
* support iops monitoring

* fix: lint failed

Co-authored-by: DuodenumL <qq2410088750@live.com>
  • Loading branch information
anrs and DuodenumL authored Oct 10, 2022
1 parent 3f243dd commit ee1f465
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 5 deletions.
83 changes: 83 additions & 0 deletions runtime/docker/blkio_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package docker

import (
enginetypes "github.com/docker/docker/api/types"

"github.com/projecteru2/agent/utils"
)

const (
ReadOp = "Read"
WriteOp = "Write"
)

// per device level
type BlkIOMetrics struct {
IOServiceBytesReadRecursive []*BlkIOEntry
IOServiceBytesWriteRecursive []*BlkIOEntry
IOServicedReadRecusive []*BlkIOEntry
IOServicedWriteRecusive []*BlkIOEntry
}

type BlkIOEntry struct {
Dev string
Value uint64
}

func fromEngineBlkioStats(raw *enginetypes.BlkioStats) (*BlkIOMetrics, error) {
blkioMetrics := &BlkIOMetrics{}
for _, entry := range raw.IoServiceBytesRecursive {
devPath, err := utils.GetDevicePath(entry.Major, entry.Minor)
if err != nil {
return nil, err
}
switch entry.Op {
case ReadOp:
blkioMetrics.IOServiceBytesReadRecursive = append(blkioMetrics.IOServiceBytesReadRecursive, &BlkIOEntry{Dev: devPath, Value: entry.Value})
case WriteOp:
blkioMetrics.IOServiceBytesWriteRecursive = append(blkioMetrics.IOServiceBytesWriteRecursive, &BlkIOEntry{Dev: devPath, Value: entry.Value})
}
}
for _, entry := range raw.IoServicedRecursive {
devPath, err := utils.GetDevicePath(entry.Major, entry.Minor)
if err != nil {
return nil, err
}
switch entry.Op {
case ReadOp:
blkioMetrics.IOServicedReadRecusive = append(blkioMetrics.IOServicedReadRecusive, &BlkIOEntry{Dev: devPath, Value: entry.Value})
case WriteOp:
blkioMetrics.IOServicedWriteRecusive = append(blkioMetrics.IOServicedWriteRecusive, &BlkIOEntry{Dev: devPath, Value: entry.Value})
}
}
return blkioMetrics, nil
}

// getBlkIOMetricsDifference calculate differences between old and new metrics (new-old), for missing metrics, will use default 0 as value
func getBlkIOMetricsDifference(old *BlkIOMetrics, new *BlkIOMetrics) (diff *BlkIOMetrics) {
return &BlkIOMetrics{
IOServiceBytesReadRecursive: getGroupDifference(old.IOServiceBytesReadRecursive, new.IOServiceBytesReadRecursive),
IOServiceBytesWriteRecursive: getGroupDifference(old.IOServiceBytesWriteRecursive, new.IOServiceBytesWriteRecursive),
IOServicedReadRecusive: getGroupDifference(old.IOServicedReadRecusive, new.IOServicedReadRecusive),
IOServicedWriteRecusive: getGroupDifference(old.IOServicedWriteRecusive, new.IOServicedWriteRecusive),
}
}

func getGroupDifference(old []*BlkIOEntry, new []*BlkIOEntry) (diff []*BlkIOEntry) {
lookup := func(dev string, entryList []*BlkIOEntry) uint64 {
for _, entry := range entryList {
if entry.Dev == dev {
return entry.Value
}
}
return 0
}
for _, entry := range new {
diffEntry := &BlkIOEntry{
Dev: entry.Dev,
Value: entry.Value - lookup(entry.Dev, old),
}
diff = append(diff, diffEntry)
}
return
}
23 changes: 23 additions & 0 deletions runtime/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,29 @@ func (d *Docker) LogFieldsExtra(ctx context.Context, ID string) (map[string]stri
return extra, nil
}

func (d *Docker) getContainerStats(ctx context.Context, ID string) (*enginetypes.StatsJSON, error) {
rawStat, err := d.client.ContainerStatsOneShot(ctx, ID)
if err != nil {
log.Errorf("[getContainerStats] failed to get container %s stats, err: %v", ID, err)
return nil, err
}
b, err := io.ReadAll(rawStat.Body)
if err != nil {
log.Errorf("[getContainerStats] failed to read container %s stats, err: %v", ID, err)
return nil, err
}
stats := &enginetypes.StatsJSON{}
return stats, json.Unmarshal(b, stats)
}

func (d *Docker) getBlkioStats(ctx context.Context, ID string) (*enginetypes.BlkioStats, error) {
fullStat, err := d.getContainerStats(ctx, ID)
if err != nil {
return nil, err
}
return &fullStat.BlkioStats, nil
}

// IsDaemonRunning returns if the runtime daemon is running.
func (d *Docker) IsDaemonRunning(ctx context.Context) bool {
var err error
Expand Down
123 changes: 120 additions & 3 deletions runtime/docker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ type MetricsClient struct {
errOut *prometheus.GaugeVec
dropIn *prometheus.GaugeVec
dropOut *prometheus.GaugeVec

// diskio stats
ioServiceBytesRead *prometheus.GaugeVec
ioServiceBytesWrite *prometheus.GaugeVec
ioServicedRead *prometheus.GaugeVec
ioServicedWrite *prometheus.GaugeVec
// io/byte per second
ioServiceBytesReadPerSecond *prometheus.GaugeVec
ioServiceBytesWritePerSecond *prometheus.GaugeVec
ioServicedReadPerSecond *prometheus.GaugeVec
ioServicedWritePerSecond *prometheus.GaugeVec
}

var clients sync.Map
Expand All @@ -67,7 +78,6 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli
"orchestrator": cluster.ERUMark,
"labels": strings.Join(clables, ","),
}

cpuHostUsage := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "cpu_host_usage",
Help: "cpu usage in host view.",
Expand Down Expand Up @@ -163,7 +173,46 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli
Help: "drop out.",
ConstLabels: labels,
}, []string{"nic"})

ioServiceBytesRead := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "io_service_bytes_read",
Help: "number of bytes read to the disk by the group.",
ConstLabels: labels,
}, []string{"dev"})
ioServiceBytesWrite := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "io_service_bytes_write",
Help: "number of bytes write to the disk by the group.",
ConstLabels: labels,
}, []string{"dev"})
ioServicedRead := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "io_serviced_read",
Help: "number of read IOs to the disk by the group.",
ConstLabels: labels,
}, []string{"dev"})
ioServicedWrite := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "io_serviced_write",
Help: "number of write IOs to the disk by the group.",
ConstLabels: labels,
}, []string{"dev"})
ioServiceBytesReadPerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "io_service_bytes_read_per_second",
Help: "number of bytes read per second to the disk by the group.",
ConstLabels: labels,
}, []string{"dev"})
ioServiceBytesWritePerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "io_service_bytes_write_per_second",
Help: "number of bytes write per second to the disk by the group.",
ConstLabels: labels,
}, []string{"dev"})
ioServicedReadPerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "io_serviced_read_per_second",
Help: "number of read IOs per second to the disk by the group.",
ConstLabels: labels,
}, []string{"dev"})
ioServicedWritePerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "io_serviced_write_per_second",
Help: "number of write IOs per second to the disk by the group.",
ConstLabels: labels,
}, []string{"dev"})
// TODO 这里已经没有了版本了
tag := fmt.Sprintf("%s.%s", hostname, coreutils.ShortID(container.ID))
endpoint := fmt.Sprintf("%s.%s", container.Name, container.EntryPoint)
Expand All @@ -174,7 +223,7 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli
cpuContainerSysUsage, cpuContainerUsage, cpuContainerUserUsage,
memMaxUsage, memRss, memUsage, memPercent, memRSSPercent,
bytesRecv, bytesSent, packetsRecv, packetsSent,
errIn, errOut, dropIn, dropOut,
errIn, errOut, dropIn, dropOut, ioServiceBytesRead, ioServiceBytesWrite, ioServicedRead, ioServicedWrite, ioServiceBytesReadPerSecond, ioServiceBytesWritePerSecond, ioServicedReadPerSecond, ioServicedWritePerSecond,
)

metricsClient := &MetricsClient{
Expand Down Expand Up @@ -204,6 +253,16 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli
errOut: errOut,
dropIn: dropIn,
dropOut: dropOut,

ioServiceBytesRead: ioServiceBytesRead,
ioServiceBytesWrite: ioServiceBytesWrite,
ioServicedRead: ioServicedRead,
ioServicedWrite: ioServicedWrite,

ioServiceBytesReadPerSecond: ioServiceBytesReadPerSecond,
ioServiceBytesWritePerSecond: ioServiceBytesWritePerSecond,
ioServicedReadPerSecond: ioServicedReadPerSecond,
ioServicedWritePerSecond: ioServicedWritePerSecond,
}
clients.Store(container.ID, metricsClient)
return metricsClient
Expand Down Expand Up @@ -233,6 +292,16 @@ func (m *MetricsClient) Unregister() {
prometheus.Unregister(m.errOut)
prometheus.Unregister(m.dropIn)
prometheus.Unregister(m.dropOut)

prometheus.Unregister(m.ioServiceBytesRead)
prometheus.Unregister(m.ioServiceBytesWrite)
prometheus.Unregister(m.ioServicedRead)
prometheus.Unregister(m.ioServicedWrite)

prometheus.Unregister(m.ioServiceBytesReadPerSecond)
prometheus.Unregister(m.ioServiceBytesWritePerSecond)
prometheus.Unregister(m.ioServicedReadPerSecond)
prometheus.Unregister(m.ioServicedWritePerSecond)
}

// CPUHostUsage set cpu usage in host view
Expand Down Expand Up @@ -349,6 +418,54 @@ func (m *MetricsClient) DropOut(nic string, i float64) {
m.dropOut.WithLabelValues(nic).Set(i)
}

// IOServiceBytesRead .
func (m *MetricsClient) IOServiceBytesRead(dev string, i float64) {
m.data[dev+".io_service_bytes_read"] = i
m.ioServiceBytesRead.WithLabelValues(dev).Set(i)
}

// IOServiceBytesWrite .
func (m *MetricsClient) IOServiceBytesWrite(dev string, i float64) {
m.data[dev+".io_service_bytes_write"] = i
m.ioServiceBytesWrite.WithLabelValues(dev).Set(i)
}

// IOServicedRead .
func (m *MetricsClient) IOServicedRead(dev string, i float64) {
m.data[dev+".io_serviced_read"] = i
m.ioServicedRead.WithLabelValues(dev).Set(i)
}

// IOServicedWrite .
func (m *MetricsClient) IOServicedWrite(dev string, i float64) {
m.data[dev+".io_serviced_write"] = i
m.ioServicedWrite.WithLabelValues(dev).Set(i)
}

// IOServiceBytesReadPerSecond .
func (m *MetricsClient) IOServiceBytesReadPerSecond(dev string, i float64) {
m.data[dev+".io_service_bytes_read_per_second"] = i
m.ioServiceBytesReadPerSecond.WithLabelValues(dev).Set(i)
}

// IOServiceBytesWritePerSecond .
func (m *MetricsClient) IOServiceBytesWritePerSecond(dev string, i float64) {
m.data[dev+".io_service_bytes_write_per_second"] = i
m.ioServiceBytesWritePerSecond.WithLabelValues(dev).Set(i)
}

// IOServicedReadPerSecond .
func (m *MetricsClient) IOServicedReadPerSecond(dev string, i float64) {
m.data[dev+".io_serviced_read_per_second"] = i
m.ioServicedReadPerSecond.WithLabelValues(dev).Set(i)
}

// IOServicedWritePerSecond .
func (m *MetricsClient) IOServicedWritePerSecond(dev string, i float64) {
m.data[dev+".io_serviced_write_per_second"] = i
m.ioServicedWritePerSecond.WithLabelValues(dev).Set(i)
}

// Lazy connecting
func (m *MetricsClient) checkConn() error {
if m.statsdClient != nil {
Expand Down
51 changes: 49 additions & 2 deletions runtime/docker/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// CollectWorkloadMetrics .
func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) {
func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) { //nolint
// TODO
// FIXME fuck internal pkg
proc := "/proc"
Expand All @@ -31,7 +31,16 @@ func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) {
log.Errorf("[stat] get %s stats failed %v", container.ID, err)
return
}

rawBlkioStats, err := d.getBlkioStats(ctx, container.ID)
if err != nil {
log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err)
return
}
blkioStats, err := fromEngineBlkioStats(rawBlkioStats)
if err != nil {
log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err)
return
}
delta := float64(d.config.Metrics.Step)
timeout := time.Duration(d.config.Metrics.Step) * time.Second
tick := time.NewTicker(timeout)
Expand Down Expand Up @@ -128,6 +137,44 @@ func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) {
mClient.DropIn(nic.Name, float64(nic.Dropin-oldNICStats.Dropin)/delta)
mClient.DropOut(nic.Name, float64(nic.Dropout-oldNICStats.Dropout)/delta)
}
log.Debugf("[stat] start to get blkio stats for %s", container.ID)
newRawBlkioStats, err := d.getBlkioStats(ctx, container.ID)
if err != nil {
log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err)
return
}
newBlkioStats, err := fromEngineBlkioStats(newRawBlkioStats)
if err != nil {
log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err)
return
}
for _, entry := range newBlkioStats.IOServiceBytesReadRecursive {
mClient.IOServiceBytesRead(entry.Dev, float64(entry.Value))
}
for _, entry := range newBlkioStats.IOServiceBytesWriteRecursive {
mClient.IOServiceBytesWrite(entry.Dev, float64(entry.Value))
}
for _, entry := range newBlkioStats.IOServicedReadRecusive {
mClient.IOServicedRead(entry.Dev, float64(entry.Value))
}
for _, entry := range newBlkioStats.IOServicedWriteRecusive {
mClient.IOServicedWrite(entry.Dev, float64(entry.Value))
}
// update diff
diffBlkioStats := getBlkIOMetricsDifference(blkioStats, newBlkioStats)
for _, entry := range diffBlkioStats.IOServiceBytesReadRecursive {
mClient.IOServiceBytesReadPerSecond(entry.Dev, float64(entry.Value)/delta)
}
for _, entry := range diffBlkioStats.IOServiceBytesWriteRecursive {
mClient.IOServiceBytesWritePerSecond(entry.Dev, float64(entry.Value)/delta)
}
for _, entry := range diffBlkioStats.IOServicedReadRecusive {
mClient.IOServicedReadPerSecond(entry.Dev, float64(entry.Value)/delta)
}
for _, entry := range diffBlkioStats.IOServicedWriteRecusive {
mClient.IOServicedWritePerSecond(entry.Dev, float64(entry.Value)/delta)
}
rawBlkioStats, blkioStats = newRawBlkioStats, newBlkioStats
containerCPUStats, systemCPUStats, containerNetStats = newContainerCPUStats, newSystemCPUStats, newContainerNetStats
if err := mClient.Send(); err != nil {
log.Errorf("[stat] Send metrics failed %v", err)
Expand Down
Loading

0 comments on commit ee1f465

Please sign in to comment.