Skip to content

Commit

Permalink
chores: fix fmt (#16)
Browse files Browse the repository at this point in the history
Signed-off-by: 佑祎 <zzw261520@alibaba-inc.com>
  • Loading branch information
zwzhang0107 committed Jul 13, 2023
1 parent a763c3b commit 54c6453
Show file tree
Hide file tree
Showing 18 changed files with 210 additions and 41 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
- uses: golangci/golangci-lint-action@v3
with:
version: v1.47.3
args: --timeout 3m0s

unit-tests:
strategy:
Expand Down
16 changes: 16 additions & 0 deletions cmd/yarn-copilot/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
Expand Down
16 changes: 16 additions & 0 deletions cmd/yarn-copilot/options/options.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

import "time"
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/noderesource/resource_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func Add(mgr ctrl.Manager) error {
return err
}
r := &YARNResourceSyncReconciler{
Client: mgr.GetClient(),
Client: mgr.GetClient(),
yarnClient: yarnClient,
}
if err := r.yarnClient.Initialize(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ func main() {
}

log.Printf("UpdateNodeResource response %v", response)
}
}
2 changes: 1 addition & 1 deletion pkg/yarn/client/ha_service_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ func (c *YarnHAClient) GetServiceStatus(request *hadoopcommon.GetServiceStatusRe
return nil, err
}
return response, nil
}
}
2 changes: 1 addition & 1 deletion pkg/yarn/client/rm_admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ func (c *YarnAdminClient) UpdateNodeResource(request *yarnserver.UpdateNodeResou
return nil, err
}
return response, nil
}
}
18 changes: 9 additions & 9 deletions pkg/yarn/config/yarn_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@ var (
)

const (
YARN_PREFIX = "yarn."
RM_PREFIX = YARN_PREFIX + "resourcemanager."
RM_ADDRESS = RM_PREFIX + "address"
RM_SCHEDULER_ADDRESS = RM_PREFIX + "scheduler.address"
RM_ADMIN_ADDRESS = RM_PREFIX + "admin.address"
RM_HA_ENABLED = RM_PREFIX + "ha.enabled"
RM_HA_RM_IDS = RM_PREFIX + "ha.rm-ids"
RM_AM_EXPIRY_INTERVAL_MS = YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms"
YARN_PREFIX = "yarn."
RM_PREFIX = YARN_PREFIX + "resourcemanager."
RM_ADDRESS = RM_PREFIX + "address"
RM_SCHEDULER_ADDRESS = RM_PREFIX + "scheduler.address"
RM_ADMIN_ADDRESS = RM_PREFIX + "admin.address"
RM_HA_ENABLED = RM_PREFIX + "ha.enabled"
RM_HA_RM_IDS = RM_PREFIX + "ha.rm-ids"
RM_AM_EXPIRY_INTERVAL_MS = YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms"

DEFAULT_RM_ADDRESS = "0.0.0.0:8032"
DEFAULT_RM_SCHEDULER_ADDRESS = "0.0.0.0:8030"
DEFAULT_RM_ADMIN_ADDRESS = "0.0.0.0:8033"
DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000
DEFAULT_RM_HA_ENABLED = false
DEFAULT_RM_HA_ENABLED = false
)

type yarn_configuration struct {
Expand Down
6 changes: 0 additions & 6 deletions pkg/yarn/copilot/OWNERS

This file was deleted.

41 changes: 31 additions & 10 deletions pkg/yarn/copilot/nm/nm.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nm

import (
Expand All @@ -10,13 +26,12 @@ import (
"time"

"github.com/go-resty/resty/v2"
"github.com/koordinator-sh/koordinator/pkg/koordlet/pleg"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
"github.com/opencontainers/runc/libcontainer/cgroups"
"k8s.io/klog/v2"

"github.com/koordinator-sh/goyarn/pkg/yarn/copilot/utils"
"github.com/koordinator-sh/koordinator/pkg/koordlet/pleg"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
)

const (
Expand Down Expand Up @@ -106,7 +121,7 @@ func (n *NodeMangerOperator) syncMemoryCgroup(stop <-chan struct{}) error {
func (n *NodeMangerOperator) syncNoneProcCgroup() {
klog.V(5).Info("syncNoneProcCgroup")
cpuPath := n.GenerateCgroupFullPath(system.CgroupCPUDir)
filepath.Walk(cpuPath, func(path string, info os.FileInfo, err error) error {
err := filepath.Walk(cpuPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
klog.Warningf("ignore file %s error:%s", path, err.Error())
return err
Expand All @@ -129,6 +144,9 @@ func (n *NodeMangerOperator) syncNoneProcCgroup() {
}
return nil
})
if err != nil {
klog.Errorf("walk cpu cgroup dir failed, error %v", err)
}
}

func (n *NodeMangerOperator) syncNMEndpoint() {
Expand All @@ -150,7 +168,7 @@ func (n *NodeMangerOperator) syncNMEndpoint() {
func (n *NodeMangerOperator) syncAllCgroup() {
subDirFunc := func(dir string) map[string]struct{} {
res := map[string]struct{}{}
filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
klog.Warningf("ignore file %s error:%s", path, err.Error())
return err
Expand All @@ -161,15 +179,18 @@ func (n *NodeMangerOperator) syncAllCgroup() {
}
return nil
})
if err != nil {
klog.Errorf("walk cpu cgroup dir failed, error %v", err)
}
return res
}
cpuList := subDirFunc(filepath.Join(n.CgroupRoot, system.CgroupCPUDir, n.CgroupPath))
memList := subDirFunc(filepath.Join(n.CgroupRoot, system.CgroupMemDir, n.CgroupPath))
toCreate, toDelete := utils.DiffMap(cpuList, memList)
for path, _ := range toCreate {
for path := range toCreate {
n.createMemoryCgroup(path)
}
for path, _ := range toDelete {
for path := range toDelete {
n.removeMemoryCgroup(path)
}
}
Expand Down Expand Up @@ -211,7 +232,7 @@ func (n *NodeMangerOperator) createMemoryCgroup(fileName string) {
return
}
cpuCgroupPath := filepath.Join(n.CgroupRoot, system.CgroupCPUDir, n.CgroupPath, basename)
pids, err := cgroups.GetPids(cpuCgroupPath)
pids, err := utils.GetPids(cpuCgroupPath)
if err != nil {
klog.Error(err)
return
Expand Down Expand Up @@ -261,7 +282,7 @@ func (n *NodeMangerOperator) KillContainer(containerID string) error {

func (n *NodeMangerOperator) getProcessGroupID(containerID string) int {
containerCgroupPath := filepath.Join(n.CgroupRoot, "cpu", n.CgroupPath, containerID)
pids, err := cgroups.GetPids(containerCgroupPath)
pids, err := utils.GetPids(containerCgroupPath)
if err != nil {
klog.Error(err)
return 0
Expand Down
18 changes: 17 additions & 1 deletion pkg/yarn/copilot/nm/nm_pod_discover.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nm

import (
Expand Down Expand Up @@ -28,7 +44,7 @@ func (n *NMPodWatcher) GetNMPodEndpoint() (string, bool, error) {
if pod.Labels[ComponentLabelKey] != NodeManagerComponentLabelName {
continue
}
if pod.Spec.HostNetwork == true {
if pod.Spec.HostNetwork {
return "localhost:8042", true, nil
}
return fmt.Sprintf("%s:8042", pod.Status.PodIP), true, nil
Expand Down
16 changes: 16 additions & 0 deletions pkg/yarn/copilot/server/helper.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package server

import (
Expand Down
43 changes: 32 additions & 11 deletions pkg/yarn/copilot/server/server.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package server

import (
Expand Down Expand Up @@ -40,7 +56,10 @@ func (y *YarnCopilotServer) Run(ctx context.Context) error {
Handler: e,
}
sockDir := filepath.Dir(y.unixPath)
os.MkdirAll(sockDir, os.ModePerm)
err := os.MkdirAll(sockDir, os.ModePerm)
if err != nil {
klog.Fatal("mkdir for socket failed", err)
}
if system.FileExists(y.unixPath) {
os.Remove(y.unixPath)
}
Expand All @@ -50,17 +69,20 @@ func (y *YarnCopilotServer) Run(ctx context.Context) error {
os.Exit(1)
}
defer os.Remove(y.unixPath)
go server.Serve(listener)
for {
select {
case <-ctx.Done():
klog.Info("graceful shutdown")
if err := server.Shutdown(ctx); err != nil {
klog.Errorf("Server forced to shutdown: %v", err)
}
return nil
go func() {
err := server.Serve(listener)
if err != nil {
klog.Fatal("start serve failed", err)
}
}()

<-ctx.Done()
klog.Info("graceful shutdown")
if err := server.Shutdown(ctx); err != nil {
klog.Errorf("Server forced to shutdown: %v", err)
}
return nil

}

func (y *YarnCopilotServer) Health(ctx *gin.Context) {
Expand Down Expand Up @@ -144,5 +166,4 @@ func (y *YarnCopilotServer) KillContainer(ctx *gin.Context) {
}

func (y *YarnCopilotServer) KillContainerByResource(ctx *gin.Context) {
return
}
26 changes: 26 additions & 0 deletions pkg/yarn/copilot/utils/cgroups_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//go:build linux
// +build linux

/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import "github.com/opencontainers/runc/libcontainer/cgroups"

func GetPids(cgroupPath string) ([]int, error) {
return cgroups.GetPids(cgroupPath)
}
26 changes: 26 additions & 0 deletions pkg/yarn/copilot/utils/cgroups_unsupported.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//go:build !linux
// +build !linux

/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import "fmt"

func GetPids(cgroupPath string) ([]int, error) {
return nil, fmt.Errorf("unsupported")
}
Loading

0 comments on commit 54c6453

Please sign in to comment.