diff --git a/runtime/docker/blkio_metrics.go b/runtime/docker/blkio_metrics.go new file mode 100644 index 0000000..d1906ad --- /dev/null +++ b/runtime/docker/blkio_metrics.go @@ -0,0 +1,83 @@ +package docker + +import ( + enginetypes "github.com/docker/docker/api/types" + + "github.com/projecteru2/agent/utils" +) + +const ( + ReadOp = "Read" + WriteOp = "Write" +) + +// per device level +type BlkIOMetrics struct { + IOServiceBytesReadRecursive []*BlkIOEntry + IOServiceBytesWriteRecursive []*BlkIOEntry + IOServicedReadRecusive []*BlkIOEntry + IOServicedWriteRecusive []*BlkIOEntry +} + +type BlkIOEntry struct { + Dev string + Value uint64 +} + +func fromEngineBlkioStats(raw *enginetypes.BlkioStats) (*BlkIOMetrics, error) { + blkioMetrics := &BlkIOMetrics{} + for _, entry := range raw.IoServiceBytesRecursive { + devPath, err := utils.GetDevicePath(entry.Major, entry.Minor) + if err != nil { + return nil, err + } + switch entry.Op { + case ReadOp: + blkioMetrics.IOServiceBytesReadRecursive = append(blkioMetrics.IOServiceBytesReadRecursive, &BlkIOEntry{Dev: devPath, Value: entry.Value}) + case WriteOp: + blkioMetrics.IOServiceBytesWriteRecursive = append(blkioMetrics.IOServiceBytesWriteRecursive, &BlkIOEntry{Dev: devPath, Value: entry.Value}) + } + } + for _, entry := range raw.IoServicedRecursive { + devPath, err := utils.GetDevicePath(entry.Major, entry.Minor) + if err != nil { + return nil, err + } + switch entry.Op { + case ReadOp: + blkioMetrics.IOServicedReadRecusive = append(blkioMetrics.IOServicedReadRecusive, &BlkIOEntry{Dev: devPath, Value: entry.Value}) + case WriteOp: + blkioMetrics.IOServicedWriteRecusive = append(blkioMetrics.IOServicedWriteRecusive, &BlkIOEntry{Dev: devPath, Value: entry.Value}) + } + } + return blkioMetrics, nil +} + +// getBlkIOMetricsDifference calculate differences between old and new metrics (new-old), for missing metrics, will use default 0 as value +func getBlkIOMetricsDifference(old *BlkIOMetrics, new *BlkIOMetrics) (diff *BlkIOMetrics) { + return &BlkIOMetrics{ + IOServiceBytesReadRecursive: getGroupDifference(old.IOServiceBytesReadRecursive, new.IOServiceBytesReadRecursive), + IOServiceBytesWriteRecursive: getGroupDifference(old.IOServiceBytesWriteRecursive, new.IOServiceBytesWriteRecursive), + IOServicedReadRecusive: getGroupDifference(old.IOServicedReadRecusive, new.IOServicedReadRecusive), + IOServicedWriteRecusive: getGroupDifference(old.IOServicedWriteRecusive, new.IOServicedWriteRecusive), + } +} + +func getGroupDifference(old []*BlkIOEntry, new []*BlkIOEntry) (diff []*BlkIOEntry) { + lookup := func(dev string, entryList []*BlkIOEntry) uint64 { + for _, entry := range entryList { + if entry.Dev == dev { + return entry.Value + } + } + return 0 + } + for _, entry := range new { + diffEntry := &BlkIOEntry{ + Dev: entry.Dev, + Value: entry.Value - lookup(entry.Dev, old), + } + diff = append(diff, diffEntry) + } + return +} diff --git a/runtime/docker/docker.go b/runtime/docker/docker.go index 1e46c7b..a53d718 100644 --- a/runtime/docker/docker.go +++ b/runtime/docker/docker.go @@ -331,6 +331,29 @@ func (d *Docker) LogFieldsExtra(ctx context.Context, ID string) (map[string]stri return extra, nil } +func (d *Docker) getContainerStats(ctx context.Context, ID string) (*enginetypes.StatsJSON, error) { + rawStat, err := d.client.ContainerStatsOneShot(ctx, ID) + if err != nil { + log.Errorf("[getContainerStats] failed to get container %s stats, err: %v", ID, err) + return nil, err + } + b, err := io.ReadAll(rawStat.Body) + if err != nil { + log.Errorf("[getContainerStats] failed to read container %s stats, err: %v", ID, err) + return nil, err + } + stats := &enginetypes.StatsJSON{} + return stats, json.Unmarshal(b, stats) +} + +func (d *Docker) getBlkioStats(ctx context.Context, ID string) (*enginetypes.BlkioStats, error) { + fullStat, err := d.getContainerStats(ctx, ID) + if err != nil { + return nil, err + } + return &fullStat.BlkioStats, nil +} + // IsDaemonRunning returns if the runtime daemon is running. func (d *Docker) IsDaemonRunning(ctx context.Context) bool { var err error diff --git a/runtime/docker/metrics.go b/runtime/docker/metrics.go index 8bcea4e..7b50d6f 100644 --- a/runtime/docker/metrics.go +++ b/runtime/docker/metrics.go @@ -42,6 +42,17 @@ type MetricsClient struct { errOut *prometheus.GaugeVec dropIn *prometheus.GaugeVec dropOut *prometheus.GaugeVec + + // diskio stats + ioServiceBytesRead *prometheus.GaugeVec + ioServiceBytesWrite *prometheus.GaugeVec + ioServicedRead *prometheus.GaugeVec + ioServicedWrite *prometheus.GaugeVec + // io/byte per second + ioServiceBytesReadPerSecond *prometheus.GaugeVec + ioServiceBytesWritePerSecond *prometheus.GaugeVec + ioServicedReadPerSecond *prometheus.GaugeVec + ioServicedWritePerSecond *prometheus.GaugeVec } var clients sync.Map @@ -67,7 +78,6 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli "orchestrator": cluster.ERUMark, "labels": strings.Join(clables, ","), } - cpuHostUsage := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "cpu_host_usage", Help: "cpu usage in host view.", @@ -163,7 +173,46 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli Help: "drop out.", ConstLabels: labels, }, []string{"nic"}) - + ioServiceBytesRead := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "io_service_bytes_read", + Help: "number of bytes read to the disk by the group.", + ConstLabels: labels, + }, []string{"dev"}) + ioServiceBytesWrite := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "io_service_bytes_write", + Help: "number of bytes write to the disk by the group.", + ConstLabels: labels, + }, []string{"dev"}) + ioServicedRead := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "io_serviced_read", + Help: "number of read IOs to the disk by the group.", + ConstLabels: labels, + }, []string{"dev"}) + ioServicedWrite := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "io_serviced_write", + Help: "number of write IOs to the disk by the group.", + ConstLabels: labels, + }, []string{"dev"}) + ioServiceBytesReadPerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "io_service_bytes_read_per_second", + Help: "number of bytes read per second to the disk by the group.", + ConstLabels: labels, + }, []string{"dev"}) + ioServiceBytesWritePerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "io_service_bytes_write_per_second", + Help: "number of bytes write per second to the disk by the group.", + ConstLabels: labels, + }, []string{"dev"}) + ioServicedReadPerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "io_serviced_read_per_second", + Help: "number of read IOs per second to the disk by the group.", + ConstLabels: labels, + }, []string{"dev"}) + ioServicedWritePerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "io_serviced_write_per_second", + Help: "number of write IOs per second to the disk by the group.", + ConstLabels: labels, + }, []string{"dev"}) // TODO 这里已经没有了版本了 tag := fmt.Sprintf("%s.%s", hostname, coreutils.ShortID(container.ID)) endpoint := fmt.Sprintf("%s.%s", container.Name, container.EntryPoint) @@ -174,7 +223,7 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli cpuContainerSysUsage, cpuContainerUsage, cpuContainerUserUsage, memMaxUsage, memRss, memUsage, memPercent, memRSSPercent, bytesRecv, bytesSent, packetsRecv, packetsSent, - errIn, errOut, dropIn, dropOut, + errIn, errOut, dropIn, dropOut, ioServiceBytesRead, ioServiceBytesWrite, ioServicedRead, ioServicedWrite, ioServiceBytesReadPerSecond, ioServiceBytesWritePerSecond, ioServicedReadPerSecond, ioServicedWritePerSecond, ) metricsClient := &MetricsClient{ @@ -204,6 +253,16 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli errOut: errOut, dropIn: dropIn, dropOut: dropOut, + + ioServiceBytesRead: ioServiceBytesRead, + ioServiceBytesWrite: ioServiceBytesWrite, + ioServicedRead: ioServicedRead, + ioServicedWrite: ioServicedWrite, + + ioServiceBytesReadPerSecond: ioServiceBytesReadPerSecond, + ioServiceBytesWritePerSecond: ioServiceBytesWritePerSecond, + ioServicedReadPerSecond: ioServicedReadPerSecond, + ioServicedWritePerSecond: ioServicedWritePerSecond, } clients.Store(container.ID, metricsClient) return metricsClient @@ -233,6 +292,16 @@ func (m *MetricsClient) Unregister() { prometheus.Unregister(m.errOut) prometheus.Unregister(m.dropIn) prometheus.Unregister(m.dropOut) + + prometheus.Unregister(m.ioServiceBytesRead) + prometheus.Unregister(m.ioServiceBytesWrite) + prometheus.Unregister(m.ioServicedRead) + prometheus.Unregister(m.ioServicedWrite) + + prometheus.Unregister(m.ioServiceBytesReadPerSecond) + prometheus.Unregister(m.ioServiceBytesWritePerSecond) + prometheus.Unregister(m.ioServicedReadPerSecond) + prometheus.Unregister(m.ioServicedWritePerSecond) } // CPUHostUsage set cpu usage in host view @@ -349,6 +418,54 @@ func (m *MetricsClient) DropOut(nic string, i float64) { m.dropOut.WithLabelValues(nic).Set(i) } +// IOServiceBytesRead . +func (m *MetricsClient) IOServiceBytesRead(dev string, i float64) { + m.data[dev+".io_service_bytes_read"] = i + m.ioServiceBytesRead.WithLabelValues(dev).Set(i) +} + +// IOServiceBytesWrite . +func (m *MetricsClient) IOServiceBytesWrite(dev string, i float64) { + m.data[dev+".io_service_bytes_write"] = i + m.ioServiceBytesWrite.WithLabelValues(dev).Set(i) +} + +// IOServicedRead . +func (m *MetricsClient) IOServicedRead(dev string, i float64) { + m.data[dev+".io_serviced_read"] = i + m.ioServicedRead.WithLabelValues(dev).Set(i) +} + +// IOServicedWrite . +func (m *MetricsClient) IOServicedWrite(dev string, i float64) { + m.data[dev+".io_serviced_write"] = i + m.ioServicedWrite.WithLabelValues(dev).Set(i) +} + +// IOServiceBytesReadPerSecond . +func (m *MetricsClient) IOServiceBytesReadPerSecond(dev string, i float64) { + m.data[dev+".io_service_bytes_read_per_second"] = i + m.ioServiceBytesReadPerSecond.WithLabelValues(dev).Set(i) +} + +// IOServiceBytesWritePerSecond . +func (m *MetricsClient) IOServiceBytesWritePerSecond(dev string, i float64) { + m.data[dev+".io_service_bytes_write_per_second"] = i + m.ioServiceBytesWritePerSecond.WithLabelValues(dev).Set(i) +} + +// IOServicedReadPerSecond . +func (m *MetricsClient) IOServicedReadPerSecond(dev string, i float64) { + m.data[dev+".io_serviced_read_per_second"] = i + m.ioServicedReadPerSecond.WithLabelValues(dev).Set(i) +} + +// IOServicedWritePerSecond . +func (m *MetricsClient) IOServicedWritePerSecond(dev string, i float64) { + m.data[dev+".io_serviced_write_per_second"] = i + m.ioServicedWritePerSecond.WithLabelValues(dev).Set(i) +} + // Lazy connecting func (m *MetricsClient) checkConn() error { if m.statsdClient != nil { diff --git a/runtime/docker/stat.go b/runtime/docker/stat.go index ff7c91a..83b5c88 100644 --- a/runtime/docker/stat.go +++ b/runtime/docker/stat.go @@ -12,7 +12,7 @@ import ( ) // CollectWorkloadMetrics . -func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) { +func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) { //nolint // TODO // FIXME fuck internal pkg proc := "/proc" @@ -31,7 +31,16 @@ func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) { log.Errorf("[stat] get %s stats failed %v", container.ID, err) return } - + rawBlkioStats, err := d.getBlkioStats(ctx, container.ID) + if err != nil { + log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err) + return + } + blkioStats, err := fromEngineBlkioStats(rawBlkioStats) + if err != nil { + log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err) + return + } delta := float64(d.config.Metrics.Step) timeout := time.Duration(d.config.Metrics.Step) * time.Second tick := time.NewTicker(timeout) @@ -128,6 +137,44 @@ func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) { mClient.DropIn(nic.Name, float64(nic.Dropin-oldNICStats.Dropin)/delta) mClient.DropOut(nic.Name, float64(nic.Dropout-oldNICStats.Dropout)/delta) } + log.Debugf("[stat] start to get blkio stats for %s", container.ID) + newRawBlkioStats, err := d.getBlkioStats(ctx, container.ID) + if err != nil { + log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err) + return + } + newBlkioStats, err := fromEngineBlkioStats(newRawBlkioStats) + if err != nil { + log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err) + return + } + for _, entry := range newBlkioStats.IOServiceBytesReadRecursive { + mClient.IOServiceBytesRead(entry.Dev, float64(entry.Value)) + } + for _, entry := range newBlkioStats.IOServiceBytesWriteRecursive { + mClient.IOServiceBytesWrite(entry.Dev, float64(entry.Value)) + } + for _, entry := range newBlkioStats.IOServicedReadRecusive { + mClient.IOServicedRead(entry.Dev, float64(entry.Value)) + } + for _, entry := range newBlkioStats.IOServicedWriteRecusive { + mClient.IOServicedWrite(entry.Dev, float64(entry.Value)) + } + // update diff + diffBlkioStats := getBlkIOMetricsDifference(blkioStats, newBlkioStats) + for _, entry := range diffBlkioStats.IOServiceBytesReadRecursive { + mClient.IOServiceBytesReadPerSecond(entry.Dev, float64(entry.Value)/delta) + } + for _, entry := range diffBlkioStats.IOServiceBytesWriteRecursive { + mClient.IOServiceBytesWritePerSecond(entry.Dev, float64(entry.Value)/delta) + } + for _, entry := range diffBlkioStats.IOServicedReadRecusive { + mClient.IOServicedReadPerSecond(entry.Dev, float64(entry.Value)/delta) + } + for _, entry := range diffBlkioStats.IOServicedWriteRecusive { + mClient.IOServicedWritePerSecond(entry.Dev, float64(entry.Value)/delta) + } + rawBlkioStats, blkioStats = newRawBlkioStats, newBlkioStats containerCPUStats, systemCPUStats, containerNetStats = newContainerCPUStats, newSystemCPUStats, newContainerNetStats if err := mClient.Send(); err != nil { log.Errorf("[stat] Send metrics failed %v", err) diff --git a/utils/udev_linux.go b/utils/udev_linux.go new file mode 100644 index 0000000..c1781c2 --- /dev/null +++ b/utils/udev_linux.go @@ -0,0 +1,43 @@ +//go:build linux +// +build linux + +package utils + +import ( + "errors" + "io/ioutil" //nolint + "os" + "path" + "syscall" + + "golang.org/x/sys/unix" +) + +const ( + blkDevDir = "/dev/" +) + +func GetDevicePath(major, minor uint64) (devPath string, err error) { + files, err := ioutil.ReadDir(blkDevDir) + if err != nil { + return + } + dev := getDev(major, minor) + for _, fi := range files { + if (fi.Mode() & os.ModeDevice) == os.ModeDevice { + stat, ok := fi.Sys().(*syscall.Stat_t) + if !ok { + err = errors.New("syscall fail, Not a syscall.Stat_t") + return + } + if stat.Rdev == dev { + return path.Join(blkDevDir, fi.Name()), nil + } + } + } + return "", errors.New("device not found") +} + +func getDev(major, minor uint64) (dev uint64) { + return unix.Mkdev(uint32(major), uint32(minor)) +} diff --git a/utils/udev_notlinux.go b/utils/udev_notlinux.go new file mode 100644 index 0000000..9ff5fd2 --- /dev/null +++ b/utils/udev_notlinux.go @@ -0,0 +1,8 @@ +//go:build !linux +// +build !linux + +package utils + +func GetDevicePath(major, minor uint64) (devPath string, err error) { + return "", nil +} diff --git a/utils/udev_test.go b/utils/udev_test.go new file mode 100644 index 0000000..9a528bb --- /dev/null +++ b/utils/udev_test.go @@ -0,0 +1,22 @@ +package utils + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +//func TestGetBlockDeviceFromDevnum(t *testing.T) { +// // test random dev +// dev, err := GetDeviceFromDevnum(CharDeviceType, 1, 8) +// assert.Nil(t, err) +// assert.Equal(t, "/dev/random", dev.Devnode()) +//} + +func TestGetDevicePath(t *testing.T) { + devPath, err := GetDevicePath(1, 8) + fmt.Println(devPath) + assert.Nil(t, err) + assert.Equal(t, "/dev/random", devPath) +}