Skip to content

Commit

Permalink
koordlet: optimize perf group cpi collection (koordinator-sh#1905)
Browse files Browse the repository at this point in the history
Signed-off-by: bowen-intel <bowen.song@intel.com>
  • Loading branch information
bowen-intel authored and ls-2018 committed Mar 25, 2024
1 parent 1eb4b6b commit ca3c3a9
Showing 1 changed file with 67 additions and 103 deletions.
170 changes: 67 additions & 103 deletions pkg/koordlet/util/perf_group/perf_group_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
EventsMap = map[string][]string{
"CPICollector": {"cycles", "instructions"},
}
attrMap = make(map[string]*unix.PerfEventAttr)
)

func InitBufferPool(eventsNums map[int]struct{}) {
Expand Down Expand Up @@ -93,20 +94,36 @@ func LibInit() {
klog.Fatalf("Unable to init libpfm: %v", err)
}
})
for _, events := range EventsMap {
for _, event := range events {
attr, err := createPerfConfig(event)
if err != nil {
panic(err)
}
attr.Read_format = unix.PERF_FORMAT_GROUP | unix.PERF_FORMAT_TOTAL_TIME_ENABLED | unix.PERF_FORMAT_TOTAL_TIME_RUNNING | unix.PERF_FORMAT_ID
attr.Sample_type = unix.PERF_SAMPLE_IDENTIFIER
attr.Size = uint32(unsafe.Sizeof(unix.PerfEventAttr{}))
attr.Bits |= unix.PerfBitInherit
attr.Bits |= unix.PerfBitDisabled
attrMap[event] = attr
}
}
}

func LibFinalize() {
closelibpfm.Do(func() {
C.pfm_terminate()
})
for _, attr := range attrMap {
C.free(unsafe.Pointer(attr))
}
}

type PerfGroupCollector struct {
cgroupFile *os.File
cpus []int
perfCollectors sync.Map
perfCollectors map[int]*perfCollector
idEventMap map[uint64]string
mu sync.Mutex
resultMap map[string]float64
valueCh chan perfValue
syscall6 func(trap uintptr, a1 uintptr, a2 uintptr, a3 uintptr, a4 uintptr, a5 uintptr, a6 uintptr) (r1 uintptr, r2 uintptr, err syscall.Errno)
Expand Down Expand Up @@ -145,110 +162,67 @@ func NewPerfGroupCollector(cgroupFile *os.File, cpus []int, events []string, sys
collector = &PerfGroupCollector{
cgroupFile: cgroupFile,
cpus: cpus,
perfCollectors: sync.Map{},
perfCollectors: map[int]*perfCollector{},
idEventMap: make(map[uint64]string),
mu: sync.Mutex{},
resultMap: make(map[string]float64),
valueCh: make(chan perfValue),
syscall6: syscallFunc,
closeCh: make(chan struct{}),
}
var wg sync.WaitGroup
wg.Add(len(cpus))
attrMap := make(map[string]*unix.PerfEventAttr, len(events))
for i, event := range events {
attr, e := createPerfConfig(event)
defer C.free(unsafe.Pointer(attr))
if e != nil {
err = e
return nil, err
} else {
// first event is group leader
if i == 0 {
attr.Bits |= unix.PerfBitDisabled
}
attr.Read_format = unix.PERF_FORMAT_GROUP | unix.PERF_FORMAT_TOTAL_TIME_ENABLED | unix.PERF_FORMAT_TOTAL_TIME_RUNNING | unix.PERF_FORMAT_ID
attr.Sample_type = unix.PERF_SAMPLE_IDENTIFIER
attr.Size = uint32(unsafe.Sizeof(unix.PerfEventAttr{}))
attr.Bits |= unix.PerfBitInherit
attrMap[event] = attr
}
}
var errMutex sync.Mutex
for _, cpu := range cpus {
go func(cpu int) {
// create perf group
var pc perfCollector
pc.syscall6 = syscallFunc
pc.fds = make([]io.ReadCloser, 0, len(events)-1)
pc.cpu = cpu
defer wg.Done()
attr := attrMap[events[0]]
defaultFd := -1
// create perf group
var pc perfCollector
pc.syscall6 = syscallFunc
pc.fds = make([]io.ReadCloser, 0, len(events)-1)
pc.cpu = cpu
attr := attrMap[events[0]]
defaultFd := -1
r1, _, e1 := collector.syscall6(syscall.SYS_PERF_EVENT_OPEN, uintptr(unsafe.Pointer(attr)),
cgroupFile.Fd(), uintptr(cpu), uintptr(defaultFd), uintptr(unix.PERF_FLAG_PID_CGROUP|unix.PERF_FLAG_FD_CLOEXEC), uintptr(0))
if e1 != syscall.Errno(0) {
err = multierr.Append(err, fmt.Errorf("failed to create perf fd, Error: %s, cpu: %d, event: %s", unix.ErrnoName(e1), cpu, events[0]))
return
}
leaderFd := os.NewFile(r1, fmt.Sprintf("%s_%d", events[0], cpu))
pc.leaderFd = leaderFd
var id uint64
_, _, e1 = collector.syscall6(syscall.SYS_IOCTL, r1, uintptr(unix.PERF_EVENT_IOC_ID), uintptr(unsafe.Pointer(&id)), 0, 0, 0)
if e1 != syscall.Errno(0) {
err = multierr.Append(err, fmt.Errorf("failed to get perf id, Error: %s, cpu: %d, event: %s", unix.ErrnoName(e1), cpu, events[0]))
return
}
collector.idEventMap[id] = events[0]
for i := 1; i < len(events); i++ {
attr := attrMap[events[i]]
r1, _, e1 := collector.syscall6(syscall.SYS_PERF_EVENT_OPEN, uintptr(unsafe.Pointer(attr)),
cgroupFile.Fd(), uintptr(cpu), uintptr(defaultFd), uintptr(unix.PERF_FLAG_PID_CGROUP|unix.PERF_FLAG_FD_CLOEXEC), uintptr(0))
cgroupFile.Fd(), uintptr(cpu), r1, uintptr(unix.PERF_FLAG_PID_CGROUP|unix.PERF_FLAG_FD_CLOEXEC), uintptr(0))
if e1 != syscall.Errno(0) {
errMutex.Lock()
err = multierr.Append(err, fmt.Errorf("failed to create perf fd, Error: %s, cpu: %d, event: %s", unix.ErrnoName(e1), cpu, events[0]))
errMutex.Unlock()
err = multierr.Append(err, fmt.Errorf("failed to create perf fd, Error: %s, cpu: %d, event: %s", unix.ErrnoName(e1), cpu, events[i]))
return
}
leaderFd := os.NewFile(r1, fmt.Sprintf("%s_%d", events[0], cpu))
pc.leaderFd = leaderFd
fd := os.NewFile(r1, fmt.Sprintf("%s_%d", events[i], cpu))
pc.fds = append(pc.fds, fd)
var id uint64
_, _, e1 = collector.syscall6(syscall.SYS_IOCTL, r1, uintptr(unix.PERF_EVENT_IOC_ID), uintptr(unsafe.Pointer(&id)), 0, 0, 0)
if e1 != syscall.Errno(0) {
errMutex.Lock()
err = multierr.Append(err, fmt.Errorf("failed to get perf id, Error: %s, cpu: %d, event: %s", unix.ErrnoName(e1), cpu, events[0]))
errMutex.Unlock()
return
}
collector.mu.Lock()
collector.idEventMap[id] = events[0]
collector.mu.Unlock()
for i := 1; i < len(events); i++ {
attr := attrMap[events[i]]
r1, _, e1 := collector.syscall6(syscall.SYS_PERF_EVENT_OPEN, uintptr(unsafe.Pointer(attr)),
cgroupFile.Fd(), uintptr(cpu), r1, uintptr(unix.PERF_FLAG_PID_CGROUP|unix.PERF_FLAG_FD_CLOEXEC), uintptr(0))
if e1 != syscall.Errno(0) {
errMutex.Lock()
err = multierr.Append(err, fmt.Errorf("failed to create perf fd, Error: %s, cpu: %d, event: %s", unix.ErrnoName(e1), cpu, events[i]))
errMutex.Unlock()
return
}
fd := os.NewFile(r1, fmt.Sprintf("%s_%d", events[i], cpu))
pc.fds = append(pc.fds, fd)
var id uint64
_, _, e1 = collector.syscall6(syscall.SYS_IOCTL, r1, uintptr(unix.PERF_EVENT_IOC_ID), uintptr(unsafe.Pointer(&id)), 0, 0, 0)
if e1 != syscall.Errno(0) {
errMutex.Lock()
err = multierr.Append(err, fmt.Errorf("failed to get perf id, Error: %s, cpu: %d, event: %s", unix.ErrnoName(e1), cpu, events[i]))
errMutex.Unlock()
return
}
collector.mu.Lock()
collector.idEventMap[id] = events[i]
collector.mu.Unlock()
}
// enable perf group
_, _, e1 = collector.syscall6(syscall.SYS_IOCTL, leaderFd.Fd(), uintptr(unix.PERF_EVENT_IOC_RESET), unix.PERF_IOC_FLAG_GROUP, 0, 0, 0)
if e1 != syscall.Errno(0) {
errMutex.Lock()
err = multierr.Append(err, fmt.Errorf("failed to reset perf group, Error: %s, cpu: %d, events: %s", unix.ErrnoName(e1), cpu, events))
errMutex.Unlock()
err = multierr.Append(err, fmt.Errorf("failed to get perf id, Error: %s, cpu: %d, event: %s", unix.ErrnoName(e1), cpu, events[i]))
return
}
_, _, e1 = collector.syscall6(syscall.SYS_IOCTL, leaderFd.Fd(), uintptr(unix.PERF_EVENT_IOC_ENABLE), unix.PERF_IOC_FLAG_GROUP, 0, 0, 0)
if e1 != syscall.Errno(0) {
errMutex.Lock()
err = multierr.Append(err, fmt.Errorf("failed to enable perf group, Error: %s, cpu: %d, events: %s", unix.ErrnoName(e1), cpu, events))
errMutex.Unlock()
return
}
collector.perfCollectors.Store(cpu, &pc)
}(cpu)
collector.idEventMap[id] = events[i]
}
// enable perf group
_, _, e1 = collector.syscall6(syscall.SYS_IOCTL, leaderFd.Fd(), uintptr(unix.PERF_EVENT_IOC_RESET), unix.PERF_IOC_FLAG_GROUP, 0, 0, 0)
if e1 != syscall.Errno(0) {
err = multierr.Append(err, fmt.Errorf("failed to reset perf group, Error: %s, cpu: %d, events: %s", unix.ErrnoName(e1), cpu, events))
return
}
_, _, e1 = collector.syscall6(syscall.SYS_IOCTL, leaderFd.Fd(), uintptr(unix.PERF_EVENT_IOC_ENABLE), unix.PERF_IOC_FLAG_GROUP, 0, 0, 0)
if e1 != syscall.Errno(0) {
err = multierr.Append(err, fmt.Errorf("failed to enable perf group, Error: %s, cpu: %d, events: %s", unix.ErrnoName(e1), cpu, events))
return
}
collector.perfCollectors[cpu] = &pc
}
wg.Wait()

// collect and statistic perf result
go func() {
Expand All @@ -270,23 +244,13 @@ func GetAndStartPerfGroupCollectorOnContainer(cgroupFile *os.File, cpus []int, e

func GetContainerPerfResult(collector *PerfGroupCollector) (map[string]float64, error) {
var err error
var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(len(collector.cpus))
for _, cpu := range collector.cpus {
go func(cpu int) {
defer wg.Done()
if pcInf, ok := collector.perfCollectors.Load(cpu); ok {
pc := pcInf.(*perfCollector)
if err = pc.collect(collector.valueCh); err != nil {
mu.Lock()
err = multierr.Append(err, err)
mu.Unlock()
}
if pc, ok := collector.perfCollectors[cpu]; ok {
if err = pc.collect(collector.valueCh); err != nil {
err = multierr.Append(err, err)
}
}(cpu)
}
}
wg.Wait()
err = multierr.Append(err, collector.cleanUp())
<-collector.closeCh

Expand Down

0 comments on commit ca3c3a9

Please sign in to comment.