From 2459378ace87e8c7bede55c5def666ea8b759173 Mon Sep 17 00:00:00 2001 From: wanyaoqi <18528551+wanyaoqi@users.noreply.github.com> Date: Tue, 7 Jan 2025 20:43:54 +0800 Subject: [PATCH] fix(region,host): guest reserve cpus compatible with numa allocate (#21803) * fix(region,host): isolate guest reserved cpus * fix(region,host): guest reserve cpus --- pkg/apis/compute/host.go | 1 + pkg/compute/models/guest_actions.go | 41 +++++++----- pkg/compute/models/hosts.go | 92 +++++++++++++++++++++++--- pkg/hostman/guestman/guesthelper.go | 6 +- pkg/hostman/guestman/guestman.go | 12 +++- pkg/hostman/guestman/pod.go | 23 ++++++- pkg/hostman/guestman/qemu-kvm.go | 90 +++++++++++++++++-------- pkg/hostman/hostinfo/hostinfo.go | 14 +++- pkg/hostman/hostutils/hostutils.go | 2 +- pkg/scheduler/cache/candidate/hosts.go | 12 ++++ 10 files changed, 230 insertions(+), 63 deletions(-) diff --git a/pkg/apis/compute/host.go b/pkg/apis/compute/host.go index 8802964fef7..094d2c68a9b 100644 --- a/pkg/apis/compute/host.go +++ b/pkg/apis/compute/host.go @@ -244,6 +244,7 @@ type HostDetails struct { // isolated device count IsolatedDeviceCount int IsolatedDeviceTypeCount map[string]int + GuestPinnedCpus []int // host init warnning SysWarn string `json:"sys_warn"` diff --git a/pkg/compute/models/guest_actions.go b/pkg/compute/models/guest_actions.go index fad8c9aed5e..485e5b911fc 100644 --- a/pkg/compute/models/guest_actions.go +++ b/pkg/compute/models/guest_actions.go @@ -6313,27 +6313,36 @@ func (self *SGuest) PerformCpuset(ctx context.Context, userCred mcclient.TokenCr return nil, httperrors.NewInputParameterError("Host cores %v not contains input %v", allCores, data.CPUS) } - pinnedMap, err := host.GetPinnedCpusetCores(ctx, userCred) + hostReservedCpus, err := host.GetReservedCpus() if err != nil { - return nil, errors.Wrap(err, "Get host pinned cpu cores") + return nil, errors.Wrap(err, "host get reserved cpus") } - - pinnedSets := sets.NewInt() - for key, pinned := range pinnedMap { - if key == self.GetId() { - continue + for i := range data.CPUS { + if hostReservedCpus.Contains(data.CPUS[i]) { + return nil, httperrors.NewBadRequestError("request cpu %d has been reserved", data.CPUS[i]) } - pinnedSets.Insert(pinned...) } - if pinnedSets.HasAny(data.CPUS...) { - return nil, httperrors.NewInputParameterError("More than one of input cores %v already set in host %v", data.CPUS, pinnedSets.List()) + pinnedMap, err := host.GetPinnedCpusetCores(ctx, userCred, []string{self.Id}) + if err != nil { + return nil, errors.Wrap(err, "Get host pinned cpu cores") + } + + if pinnedMap != nil { + for i := range data.CPUS { + if pinnedMap.Contains(data.CPUS[i]) { + return nil, httperrors.NewBadRequestError("request cpu %d has been set by other guests", data.CPUS[i]) + } + } } if err := self.SetMetadata(ctx, api.VM_METADATA_CGROUP_CPUSET, data, userCred); err != nil { return nil, errors.Wrap(err, "set metadata") } + if err := host.updateHostReservedCpus(ctx, userCred); err != nil { + return nil, errors.Wrap(err, "updateHostReservedCpus") + } return nil, self.StartGuestCPUSetTask(ctx, userCred, data) } @@ -6396,18 +6405,16 @@ func (self *SGuest) GetDetailsCpusetCores(ctx context.Context, userCred mcclient return nil, err } - usedMap, err := host.GetPinnedCpusetCores(ctx, userCred) + usedMap, err := host.GetPinnedCpusetCores(ctx, userCred, nil) if err != nil { return nil, err } - usedSets := sets.NewInt() - for _, used := range usedMap { - usedSets.Insert(used...) - } resp := &api.ServerGetCPUSetCoresResp{ - HostCores: allCores, - HostUsedCores: usedSets.List(), + HostCores: allCores, + } + if usedMap != nil { + resp.HostUsedCores = usedMap.ToSlice() } // fetch cpuset pinned diff --git a/pkg/compute/models/hosts.go b/pkg/compute/models/hosts.go index 74bdaf99b8d..33ef7ea13b2 100644 --- a/pkg/compute/models/hosts.go +++ b/pkg/compute/models/hosts.go @@ -3835,6 +3835,12 @@ func (manager *SHostManager) FetchCustomizeColumns( } } } + if !isList { + pinnedCpus, _ := hosts[i].GetPinnedCpusetCores(ctx, userCred, nil) + if pinnedCpus != nil { + rows[i].GuestPinnedCpus = pinnedCpus.ToSlice() + } + } if usage, ok := guestResources[hostIds[i]]; ok { rows[i].CpuCommit = usage.GuestVcpuCount @@ -5070,6 +5076,17 @@ func (hh *SHost) PerformReserveCpus( return nil, httperrors.NewInputParameterError("Can't reserve host all cpus") } + pinnedCores, err := hh.GetPinnedCpusetCores(ctx, userCred, nil) + if err != nil { + return nil, err + } + + if pinnedCores != nil { + if cs.Union(*pinnedCores).Size() != (cs.Size() + pinnedCores.Size()) { + return nil, httperrors.NewBadRequestError("request cpus confilct with guest pinned cpus") + } + } + if input.Mems != "" { mems, err := cpuset.Parse(input.Mems) if err != nil { @@ -5102,11 +5119,8 @@ func (hh *SHost) PerformReserveCpus( if err != nil { return nil, err } - if hh.CpuReserved != cs.Size() { - _, err = db.Update(hh, func() error { - hh.CpuReserved = cs.Size() - return nil - }) + if err = hh.updateHostReservedCpus(ctx, userCred); err != nil { + return nil, errors.Wrap(err, "update host reserved cpus") } return nil, err } @@ -7328,20 +7342,80 @@ func (hh *SHost) PerformSyncIsolatedDevices(ctx context.Context, userCred mcclie return res, nil } -func (hh *SHost) GetPinnedCpusetCores(ctx context.Context, userCred mcclient.TokenCredential) (map[string][]int, error) { +func (hh *SHost) GetPinnedCpusetCores(ctx context.Context, userCred mcclient.TokenCredential, excludeGuestIds []string) (*cpuset.CPUSet, error) { gsts, err := hh.GetGuests() if err != nil { return nil, errors.Wrap(err, "Get all guests") } - ret := make(map[string][]int, 0) + ret := cpuset.NewBuilder() for _, gst := range gsts { + if utils.IsInStringArray(gst.Id, excludeGuestIds) { + continue + } pinned, err := gst.getPinnedCpusetCores(ctx, userCred) if err != nil { return nil, errors.Wrapf(err, "get guest %s pinned cpuset cores", gst.GetName()) } - ret[gst.GetId()] = pinned + ret.Add(pinned...) } - return ret, nil + resCpuset := ret.Result() + if resCpuset.Size() == 0 { + return nil, nil + } + return &resCpuset, nil +} + +func (hh *SHost) GetReservedCpus() (*cpuset.CPUSet, error) { + reservedCpusStr := hh.GetMetadata(context.Background(), api.HOSTMETA_RESERVED_CPUS_INFO, nil) + if reservedCpusStr != "" { + reservedCpusJson, err := jsonutils.ParseString(reservedCpusStr) + if err != nil { + return nil, errors.Wrap(err, "parse reserved cpus info failed") + } + reservedCpusInfo := api.HostReserveCpusInput{} + err = reservedCpusJson.Unmarshal(&reservedCpusInfo) + if err != nil { + return nil, errors.Wrap(err, "unmarshal host reserved cpus info failed") + } + if reservedCpusInfo.Cpus == "" { + return nil, nil + } + cs, err := cpuset.Parse(reservedCpusInfo.Cpus) + if err != nil { + return nil, errors.Wrap(err, "parse reserved cpuset") + } + return &cs, nil + } + return nil, nil +} + +func (hh *SHost) updateHostReservedCpus(ctx context.Context, userCred mcclient.TokenCredential) error { + reservedCpus, err := hh.GetReservedCpus() + if err != nil { + return err + } + pinnedCpus, err := hh.GetPinnedCpusetCores(ctx, userCred, nil) + if err != nil { + return err + } + var reservedCpuCnt = 0 + if reservedCpus != nil { + reservedCpuCnt += reservedCpus.Size() + } + if pinnedCpus != nil { + reservedCpuCnt += pinnedCpus.Size() + } + if hh.CpuReserved != reservedCpuCnt { + _, err = db.Update(hh, func() error { + hh.CpuReserved = reservedCpuCnt + return nil + }) + if err != nil { + return err + } + } + hh.ClearSchedDescCache() + return nil } func (h *SHost) PerformSyncGuestNicTraffics(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) { diff --git a/pkg/hostman/guestman/guesthelper.go b/pkg/hostman/guestman/guesthelper.go index 01e2301c92f..586b5b4f3b8 100644 --- a/pkg/hostman/guestman/guesthelper.go +++ b/pkg/hostman/guestman/guesthelper.go @@ -229,7 +229,7 @@ type CpuSetCounter struct { } func NewGuestCpuSetCounter( - info *hostapi.HostTopology, reservedCpus *cpuset.CPUSet, numaAllocate, isContainerHost bool, + info *hostapi.HostTopology, reservedCpus cpuset.CPUSet, numaAllocate, isContainerHost bool, hugepageSizeKB int, cpuCmtbound, memCmtBound float32, reservedMemMb int, ) (*CpuSetCounter, error) { cpuSetCounter := new(CpuSetCounter) @@ -262,7 +262,7 @@ func NewGuestCpuSetCounter( cpuDie := new(CPUDie) dieBuilder := cpuset.NewBuilder() for k := 0; k < len(info.Nodes[i].Caches[j].LogicalProcessors); k++ { - if reservedCpus != nil && reservedCpus.Contains(int(info.Nodes[i].Caches[j].LogicalProcessors[k])) { + if reservedCpus.Contains(int(info.Nodes[i].Caches[j].LogicalProcessors[k])) { reservedCpuCnt += 1 continue } @@ -280,7 +280,7 @@ func NewGuestCpuSetCounter( dieBuilder := cpuset.NewBuilder() for j := 0; j < len(info.Nodes[i].Cores); j++ { for k := 0; k < len(info.Nodes[i].Cores[j].LogicalProcessors); k++ { - if reservedCpus != nil && reservedCpus.Contains(info.Nodes[i].Cores[j].LogicalProcessors[k]) { + if reservedCpus.Contains(info.Nodes[i].Cores[j].LogicalProcessors[k]) { reservedCpuCnt += 1 continue } diff --git a/pkg/hostman/guestman/guestman.go b/pkg/hostman/guestman/guestman.go index 3f0d1cee8e1..b935b1879f3 100644 --- a/pkg/hostman/guestman/guestman.go +++ b/pkg/hostman/guestman/guestman.go @@ -60,6 +60,7 @@ import ( "yunion.io/x/onecloud/pkg/mcclient" modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute" "yunion.io/x/onecloud/pkg/util/cgrouputils" + "yunion.io/x/onecloud/pkg/util/cgrouputils/cpuset" "yunion.io/x/onecloud/pkg/util/fileutils2" "yunion.io/x/onecloud/pkg/util/netutils2" "yunion.io/x/onecloud/pkg/util/pod" @@ -299,8 +300,17 @@ func (m *SGuestManager) Bootstrap() (chan struct{}, error) { m.numaAllocate = !m.host.IsNumaAllocateEnabled() && enableMemAlloc && (len(hostTypo.Nodes) > 1) } + var reserveCpus = cpuset.NewCPUSet() + hostReserveCpus, guestPinnedCpus := m.host.GetReservedCpusInfo() + if hostReserveCpus != nil { + reserveCpus = reserveCpus.Union(*hostReserveCpus) + } + if guestPinnedCpus != nil { + reserveCpus = reserveCpus.Union(*guestPinnedCpus) + } + cpuSet, err := NewGuestCpuSetCounter( - hostTypo, m.host.GetReservedCpusInfo(), m.numaAllocate, m.host.IsContainerHost(), + hostTypo, reserveCpus, m.numaAllocate, m.host.IsContainerHost(), m.host.HugepageSizeKb(), m.host.CpuCmtBound(), m.host.MemCmtBound(), m.host.GetReservedMemMb(), ) if err != nil { diff --git a/pkg/hostman/guestman/pod.go b/pkg/hostman/guestman/pod.go index ebd8dce4722..51c9d1ca631 100644 --- a/pkg/hostman/guestman/pod.go +++ b/pkg/hostman/guestman/pod.go @@ -1113,6 +1113,28 @@ func (s *sPodGuestInstance) allocateCpuNumaPin() error { return nil } + if scpuset, ok := s.Desc.Metadata[computeapi.VM_METADATA_CGROUP_CPUSET]; ok { + cpusetJson, err := jsonutils.ParseString(scpuset) + if err != nil { + log.Errorf("failed parse server %s cpuset %s: %s", s.Id, scpuset, err) + return errors.Errorf("failed parse server %s cpuset %s: %s", s.Id, scpuset, err) + } + input := new(computeapi.ServerCPUSetInput) + err = cpusetJson.Unmarshal(input) + if err != nil { + log.Errorf("failed unmarshal server %s cpuset %s", s.Id, err) + return errors.Errorf("failed unmarshal server %s cpuset %s", s.Id, err) + } + cpus := input.CPUS + s.Desc.VcpuPin = []desc.SCpuPin{ + { + Vcpus: fmt.Sprintf("0-%d", s.Desc.Cpu-1), + Pcpus: cpuset.NewCPUSet(cpus...).String(), + }, + } + return nil + } + var cpus = make([]int, 0) var preferNumaNodes = make([]int8, 0) for i := range s.Desc.IsolatedDevices { @@ -1150,7 +1172,6 @@ func (s *sPodGuestInstance) allocateCpuNumaPin() error { } else { vcpuPin[i].Vcpu = -1 } - } memPin := &desc.SCpuNumaPin{ diff --git a/pkg/hostman/guestman/qemu-kvm.go b/pkg/hostman/guestman/qemu-kvm.go index a22306fa1be..038bbc0ff3b 100644 --- a/pkg/hostman/guestman/qemu-kvm.go +++ b/pkg/hostman/guestman/qemu-kvm.go @@ -2684,24 +2684,45 @@ func (s *SKVMGuestInstance) setCgroupCPUSet() error { return err } - for i := range s.Desc.CpuNumaPin { - if s.Desc.CpuNumaPin[i].Unregular || s.Desc.CpuNumaPin[i].VcpuPin == nil { - continue - } + if len(s.Desc.CpuNumaPin) > 0 { + for i := range s.Desc.CpuNumaPin { + if s.Desc.CpuNumaPin[i].Unregular || s.Desc.CpuNumaPin[i].VcpuPin == nil { + continue + } - for j := range s.Desc.CpuNumaPin[i].VcpuPin { - vcpuThreadId, ok := vcpuThreads[s.Desc.CpuNumaPin[i].VcpuPin[j].Vcpu] + for j := range s.Desc.CpuNumaPin[i].VcpuPin { + vcpuThreadId, ok := vcpuThreads[s.Desc.CpuNumaPin[i].VcpuPin[j].Vcpu] + if !ok { + return errors.Errorf("failed get vcpu %d thread id from %v", s.Desc.CpuNumaPin[i].VcpuPin[j].Vcpu, vcpuThreads) + } + pcpu := s.Desc.CpuNumaPin[i].VcpuPin[j].Pcpu + vcpuCgname := path.Join(cgName, vcpuThreadId) + taskVcpu := cgrouputils.NewCGroupSubCPUSetTask(guestPid, vcpuCgname, 0, strconv.Itoa(pcpu), []string{vcpuThreadId}) + if !taskVcpu.SetTask() { + return errors.Errorf("Vcpu set cgroup cpuset task failed") + } + } + } + } else if len(s.Desc.VcpuPin) > 1 { + // for guest manual set cpuset + for i := range s.Desc.VcpuPin { + vcpu, err := strconv.Atoi(s.Desc.VcpuPin[i].Vcpus) + if err != nil { + return errors.Wrapf(err, "failed parse vcpupin %s", s.Desc.VcpuPin[i].Vcpus) + } + vcpuThreadId, ok := vcpuThreads[vcpu] if !ok { - return errors.Errorf("failed get vcpu %d thread id from %v", s.Desc.CpuNumaPin[i].VcpuPin[j].Vcpu, vcpuThreads) + return errors.Errorf("failed get vcpu %s thread id from %v", s.Desc.VcpuPin[i].Vcpus, vcpuThreads) } - pcpu := s.Desc.CpuNumaPin[i].VcpuPin[j].Pcpu + pcpu := s.Desc.VcpuPin[i].Pcpus vcpuCgname := path.Join(cgName, vcpuThreadId) - taskVcpu := cgrouputils.NewCGroupSubCPUSetTask(guestPid, vcpuCgname, 0, strconv.Itoa(pcpu), []string{vcpuThreadId}) + taskVcpu := cgrouputils.NewCGroupSubCPUSetTask(guestPid, vcpuCgname, 0, pcpu, []string{vcpuThreadId}) if !taskVcpu.SetTask() { return errors.Errorf("Vcpu set cgroup cpuset task failed") } } } + return nil } @@ -2716,6 +2737,38 @@ func (s *SKVMGuestInstance) allocGuestNumaCpuset() error { } } + if scpuset, ok := s.Desc.Metadata[api.VM_METADATA_CGROUP_CPUSET]; ok { + cpusetJson, err := jsonutils.ParseString(scpuset) + if err != nil { + log.Errorf("failed parse server %s cpuset %s: %s", s.Id, scpuset, err) + return errors.Errorf("failed parse server %s cpuset %s: %s", s.Id, scpuset, err) + } + input := new(api.ServerCPUSetInput) + err = cpusetJson.Unmarshal(input) + if err != nil { + log.Errorf("failed unmarshal server %s cpuset %s", s.Id, err) + return errors.Errorf("failed unmarshal server %s cpuset %s", s.Id, err) + } + cpus = input.CPUS + if len(cpus) == int(s.Desc.Cpu) { + s.Desc.VcpuPin = make([]desc.SCpuPin, len(cpus)) + for i := 0; i < int(s.Desc.Cpu); i++ { + s.Desc.VcpuPin[i] = desc.SCpuPin{ + Vcpus: strconv.Itoa(i), + Pcpus: strconv.Itoa(cpus[i]), + } + } + } else { + s.Desc.VcpuPin = []desc.SCpuPin{ + { + Vcpus: fmt.Sprintf("0-%d", s.Desc.Cpu-1), + Pcpus: cpuset.NewCPUSet(cpus...).String(), + }, + } + } + return nil + } + nodeNumaCpus, err := s.manager.cpuSet.AllocCpuset(int(s.Desc.Cpu), s.Desc.Mem*1024, preferNumaNodes, s.GetId()) if err != nil { return err @@ -2743,25 +2796,6 @@ func (s *SKVMGuestInstance) allocGuestNumaCpuset() error { if len(cpuNumaPin) > 0 { s.Desc.CpuNumaPin = cpuNumaPin } else if !s.manager.numaAllocate { - if scpuset, ok := s.Desc.Metadata[api.VM_METADATA_CGROUP_CPUSET]; ok { - s.manager.cpuSet.Lock.Lock() - s.manager.cpuSet.ReleaseCpus(cpus, int(s.Desc.Cpu)) - s.manager.cpuSet.Lock.Unlock() - - cpusetJson, err := jsonutils.ParseString(scpuset) - if err != nil { - log.Errorf("failed parse server %s cpuset %s: %s", s.Id, scpuset, err) - return errors.Errorf("failed parse server %s cpuset %s: %s", s.Id, scpuset, err) - } - input := new(api.ServerCPUSetInput) - err = cpusetJson.Unmarshal(input) - if err != nil { - log.Errorf("failed unmarshal server %s cpuset %s", s.Id, err) - return errors.Errorf("failed unmarshal server %s cpuset %s", s.Id, err) - } - cpus = input.CPUS - } - s.Desc.VcpuPin = []desc.SCpuPin{ { Vcpus: fmt.Sprintf("0-%d", s.Desc.Cpu-1), diff --git a/pkg/hostman/hostinfo/hostinfo.go b/pkg/hostman/hostinfo/hostinfo.go index 7e9a4197ebd..2cd670a4ff3 100644 --- a/pkg/hostman/hostinfo/hostinfo.go +++ b/pkg/hostman/hostinfo/hostinfo.go @@ -100,6 +100,7 @@ type SHostInfo struct { isInit bool onHostDown string reservedCpusInfo *api.HostReserveCpusInput + guestPinnedCpus []int enableNumaAllocate bool cpuCmtBound float32 memCmtBound float32 @@ -1687,6 +1688,7 @@ func (h *SHostInfo) parseReservedCpusInfo(hostInfo *api.HostDetails) error { } h.reservedCpusInfo = &reservedCpusInfo } + h.guestPinnedCpus = hostInfo.GuestPinnedCpus return nil } @@ -2646,12 +2648,18 @@ func (h *SHostInfo) GetHostTopology() *hostapi.HostTopology { return h.sysinfo.Topology } -func (h *SHostInfo) GetReservedCpusInfo() *cpuset.CPUSet { +func (h *SHostInfo) GetReservedCpusInfo() (*cpuset.CPUSet, *cpuset.CPUSet) { if h.reservedCpusInfo == nil { - return nil + return nil, nil } cpus, _ := cpuset.Parse(h.reservedCpusInfo.Cpus) - return &cpus + + var guestPinnedCpus *cpuset.CPUSet + if len(h.guestPinnedCpus) > 0 { + guestPinnedCpuSet := cpuset.NewCPUSet(h.guestPinnedCpus...) + guestPinnedCpus = &guestPinnedCpuSet + } + return &cpus, guestPinnedCpus } func (h *SHostInfo) IsNumaAllocateEnabled() bool { diff --git a/pkg/hostman/hostutils/hostutils.go b/pkg/hostman/hostutils/hostutils.go index b9da93d8ccc..d4dd72756e3 100644 --- a/pkg/hostman/hostutils/hostutils.go +++ b/pkg/hostman/hostutils/hostutils.go @@ -70,7 +70,7 @@ type IHost interface { IsAarch64() bool IsX8664() bool GetHostTopology() *hostapi.HostTopology - GetReservedCpusInfo() *cpuset.CPUSet + GetReservedCpusInfo() (*cpuset.CPUSet, *cpuset.CPUSet) GetReservedMemMb() int IsHugepagesEnabled() bool diff --git a/pkg/scheduler/cache/candidate/hosts.go b/pkg/scheduler/cache/candidate/hosts.go index c6779b69ca5..3e4116df6df 100644 --- a/pkg/scheduler/cache/candidate/hosts.go +++ b/pkg/scheduler/cache/candidate/hosts.go @@ -1471,6 +1471,18 @@ func (b *HostBuilder) fillGuestsCpuNumaPin(desc *HostDesc, host *computemodels.S } reservedCpus = &reservedCpuset } + pinnedCpuset, err := host.GetPinnedCpusetCores(context.Background(), nil, nil) + if err != nil { + return err + } + if pinnedCpuset != nil { + if reservedCpus == nil { + reservedCpus = pinnedCpuset + } else { + newset := reservedCpus.Union(*pinnedCpuset) + reservedCpus = &newset + } + } nodeHugepages := make([]hostapi.HostNodeHugepageNr, 0) if host.SysInfo.Contains("node_hugepages") {