Skip to content

Commit

Permalink
compatible with v1 and v1alpha2 cri api version
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>

update cloneset ut for go 1.19

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>

update cloneset ut for go 1.19

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
  • Loading branch information
mingzhou.swx authored and Kuromesi committed Aug 7, 2023
1 parent da374ff commit 15ed547
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 50 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ on:

env:
# Common versions
GO_VERSION: '1.18'
GOLANGCI_VERSION: 'v1.47'
GO_VERSION: '1.19'
GOLANGCI_VERSION: 'v1.51'
DOCKER_BUILDX_VERSION: 'v0.4.2'

# Common users. We can't run a step 'if secrets.AWS_USR != ""' but we can run
Expand Down Expand Up @@ -53,7 +53,7 @@ jobs:
run: |
make generate
- name: Lint golang code
uses: golangci/golangci-lint-action@v3.2.0
uses: golangci/golangci-lint-action@v3.5.0
with:
version: ${{ env.GOLANGCI_VERSION }}
args: --verbose
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e-1.16.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:

env:
# Common versions
GO_VERSION: '1.18'
GO_VERSION: '1.19'
KIND_VERSION: 'v0.14.0'
KIND_IMAGE: 'kindest/node:v1.16.15'
KIND_CLUSTER_NAME: 'ci-testing'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: E2E-1.20
name: E2E-1.20-EphemeralJob

on:
push:
Expand All @@ -10,7 +10,7 @@ on:

env:
# Common versions
GO_VERSION: '1.18'
GO_VERSION: '1.19'
KIND_VERSION: 'v0.14.0'
KIND_IMAGE: 'kindest/node:v1.20.7'
KIND_CLUSTER_NAME: 'ci-testing'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: E2E-1.24
name: E2E-1.26

on:
push:
Expand All @@ -10,10 +10,10 @@ on:

env:
# Common versions
GO_VERSION: '1.18'
GO_VERSION: '1.19'
KIND_ACTION_VERSION: 'v1.3.0'
KIND_VERSION: 'v0.14.0'
KIND_IMAGE: 'kindest/node:v1.24.2'
KIND_IMAGE: 'kindest/node:v1.26.4'
KIND_CLUSTER_NAME: 'ci-testing'

jobs:
Expand Down
22 changes: 11 additions & 11 deletions pkg/controller/cloneset/utils/pod_sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ func TestSpreadConstraintsRanker(t *testing.T) {
expectedPodsSorted: []types.UID{
"d-0",
"d-1",
"d-2", "e-0",
"c-0", "e-1",
"c-1", "e-2",
"b-0", "f-0", "i-0",
"c-0", "e-0",
"d-2", "e-1",
"b-0", "e-2",
"c-1", "f-0", "i-0",
"d-3", "e-3", "i-1",
"c-2", "f-1", "i-2",
"b-1", "e-4", "i-3",
"b-1", "f-1", "i-2",
"c-2", "e-4", "i-3",
"d-4", "f-2", "i-4",
},
},
Expand Down Expand Up @@ -229,14 +229,14 @@ func TestSpreadConstraintsRanker(t *testing.T) {
expectedPodsSorted: []types.UID{
"d-0", "e-0",
"d-1", "e-1",
"d-2", "i-0",
"c-0", "e-2",
"c-0", "i-0",
"d-2", "e-2",
"i-1", "f-0",
"c-1", "b-0",
"b-0", "c-1",
"i-2", "d-3",
"e-3", "f-1",
"i-3", "c-2",
"b-1", "e-4",
"i-3", "b-1",
"c-2", "e-4",
"d-4", "f-2", "i-4",
},
},
Expand Down
27 changes: 14 additions & 13 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,19 +565,20 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
}
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) {
if monotonic {
klog.V(4).InfoS("StatefulSet is waiting for Pod to Terminate",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return &status, nil
} else if isTerminating(replicas[i]) && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
klog.V(4).Infof(
"StatefulSet %s/%s Pod %s is Terminating, and break pods scale",
set.Namespace,
set.Name,
replicas[i].Name)
break
}
if isTerminating(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
} else if isTerminating(replicas[i]) && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
klog.V(4).Infof(
"StatefulSet %s/%s Pod %s is Terminating, and break pods scale",
set.Namespace,
set.Name,
replicas[i].Name)
break
}
// Update InPlaceUpdateReady condition for pod
if res := ssc.inplaceControl.Refresh(replicas[i], nil); res.RefreshErr != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/daemon/criruntime/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,6 @@ func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountMan
var typedVersion *runtimeapi.VersionResponse

switch cfg.runtimeType {
case ContainerRuntimeDocker:
imageService, err = runtimeimage.NewDockerImageService(cfg.runtimeURI, accountManager)
if err != nil {
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
case ContainerRuntimeContainerd, ContainerRuntimeCommonCRI, ContainerRuntimePouch:
addr, _, err := kubeletutil.GetAddressAndDialer(cfg.runtimeRemoteURI)
if err != nil {
Expand All @@ -107,6 +101,12 @@ func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountMan
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
case ContainerRuntimeDocker:
imageService, err = runtimeimage.NewDockerImageService(cfg.runtimeURI, accountManager)
if err != nil {
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
}

if _, err = imageService.ListImages(context.TODO()); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/daemon/criruntime/imageruntime/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func NewContainerdImageService(
accountManager: accountManager,
snapshotter: snapshotter,
client: client,
// TODO: compatible with v1alpha2 cri api
criImageClient: runtimeapi.NewImageServiceClient(conn),
httpProxy: httpProxy,
}, nil
Expand Down Expand Up @@ -325,6 +326,7 @@ func getDefaultValuesFromCRIStatus(conn *grpc.ClientConn) (snapshotter string, h
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()

// TODO: compatible with v1alpha2 cri api
rclient := runtimeapi.NewRuntimeServiceClient(conn)
resp, err := rclient.Status(ctx, &runtimeapi.StatusRequest{Verbose: true})
if err != nil {
Expand Down
165 changes: 154 additions & 11 deletions pkg/daemon/criruntime/imageruntime/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package imageruntime
import (
"context"
"io"
"reflect"
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
Expand All @@ -26,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
runtimeapiv1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cri/remote/util"
"k8s.io/kubernetes/pkg/util/parsers"
Expand Down Expand Up @@ -53,26 +55,47 @@ func NewCRIImageService(runtimeURI string, accountManager daemonutil.ImagePullAc
return nil, err
}

klog.V(4).InfoS("Finding the CRI API image version")
imageClient := runtimeapi.NewImageServiceClient(conn)

if _, err := imageClient.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{}); err == nil {
klog.V(2).InfoS("Using CRI v1 image API")
imageClientV1, imageClientV1alpha2, err := determineImageClientAPIVersion(conn)
if err != nil {
klog.ErrorS(err, "Failed to determine CRI image API version")
return nil, err
}

return &commonCRIImageService{
accountManager: accountManager,
criImageClient: imageClient,
accountManager: accountManager,
criImageClient: imageClientV1,
criImageClientV1alpha2: imageClientV1alpha2,
}, nil
}

type commonCRIImageService struct {
accountManager daemonutil.ImagePullAccountManager
criImageClient runtimeapi.ImageServiceClient
accountManager daemonutil.ImagePullAccountManager
criImageClient runtimeapi.ImageServiceClient
criImageClientV1alpha2 runtimeapiv1alpha2.ImageServiceClient
}

func (c *commonCRIImageService) useV1API() bool {
return c.criImageClientV1alpha2 == nil || reflect.ValueOf(c.criImageClientV1alpha2).IsNil()
}

// PullImage implements ImageService.PullImage.
func (c *commonCRIImageService) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret, sandboxConfig *appsv1alpha1.SandboxConfig) (ImagePullStatusReader, error) {
if c.useV1API() {
return c.pullImageV1(ctx, imageName, tag, pullSecrets, sandboxConfig)
}
return c.pullImageV1alpha2(ctx, imageName, tag, pullSecrets, sandboxConfig)
}

// ListImages implements ImageService.ListImages.
func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, error) {
if c.useV1API() {
return c.listImagesV1(ctx)
}
return c.listImagesV1alpha2(ctx)
}

// PullImage implements ImageService.PullImage using v1 CRI client.
func (c *commonCRIImageService) pullImageV1(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret, sandboxConfig *appsv1alpha1.SandboxConfig) (ImagePullStatusReader, error) {
registry := daemonutil.ParseRegistry(imageName)
fullImageName := imageName + ":" + tag
repoToPull, _, _, err := parsers.ParseImageName(fullImageName)
Expand Down Expand Up @@ -172,8 +195,8 @@ func (c *commonCRIImageService) PullImage(ctx context.Context, imageName, tag st
return newImagePullStatusReader(pipeR), nil
}

// ListImages implements ImageService.ListImages.
func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, error) {
// ListImages implements ImageService.ListImages using V1 CRI client.
func (c *commonCRIImageService) listImagesV1(ctx context.Context) ([]ImageInfo, error) {
listImagesReq := &runtimeapi.ListImagesRequest{}
listImagesResp, err := c.criImageClient.ListImages(ctx, listImagesReq)
if err != nil {
Expand All @@ -190,3 +213,123 @@ func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, er
}
return collection, nil
}

// PullImage implements ImageService.PullImage using v1alpha2 CRI client.
func (c *commonCRIImageService) pullImageV1alpha2(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret, sandboxConfig *appsv1alpha1.SandboxConfig) (ImagePullStatusReader, error) {
registry := daemonutil.ParseRegistry(imageName)
fullImageName := imageName + ":" + tag
repoToPull, _, _, err := parsers.ParseImageName(fullImageName)
if err != nil {
return nil, err
}
// Reader
pipeR, pipeW := io.Pipe()
defer pipeW.Close()

var auth *runtimeapiv1alpha2.AuthConfig
pullImageReq := &runtimeapiv1alpha2.PullImageRequest{
Image: &runtimeapiv1alpha2.ImageSpec{
Image: fullImageName,
Annotations: make(map[string]string),
},
Auth: auth, //default is nil
}
if sandboxConfig != nil {
pullImageReq.SandboxConfig = &runtimeapiv1alpha2.PodSandboxConfig{
Annotations: sandboxConfig.Annotations,
Labels: sandboxConfig.Labels,
}
if pullImageReq.SandboxConfig.Annotations == nil {
pullImageReq.SandboxConfig.Annotations = map[string]string{}
}
} else {
pullImageReq.SandboxConfig = &runtimeapiv1alpha2.PodSandboxConfig{
Annotations: map[string]string{},
}
}
// Add this default annotation to avoid unexpected panic caused by sandboxConfig is nil
// for some runtime implementations.
pullImageReq.SandboxConfig.Annotations[pullingImageSandboxConfigAnno] = "kruise-daemon"

if len(pullSecrets) > 0 {
var authInfos []daemonutil.AuthInfo
authInfos, err = convertToRegistryAuths(pullSecrets, repoToPull)
if err == nil {
var pullErrs []error
for _, authInfo := range authInfos {
var pullErr error
klog.V(5).Infof("Pull image %v:%v with user %v", imageName, tag, authInfo.Username)
pullImageReq.Auth = &runtimeapiv1alpha2.AuthConfig{
Username: authInfo.Username,
Password: authInfo.Password,
}
_, pullErr = c.criImageClientV1alpha2.PullImage(ctx, pullImageReq)
if pullErr == nil {
pipeW.CloseWithError(io.EOF)
return newImagePullStatusReader(pipeR), nil
}
klog.Warningf("Failed to pull image %v:%v with user %v, err %v", imageName, tag, authInfo.Username, pullErr)
pullErrs = append(pullErrs, pullErr)

}
if len(pullErrs) > 0 {
err = utilerrors.NewAggregate(pullErrs)
}
}
}

// Try the default secret
if c.accountManager != nil {
var authInfo *daemonutil.AuthInfo
var defaultErr error
authInfo, defaultErr = c.accountManager.GetAccountInfo(registry)
if defaultErr != nil {
klog.Warningf("Failed to get account for registry %v, err %v", registry, defaultErr)
// When the default account acquisition fails, try to pull anonymously
} else if authInfo != nil {
klog.V(5).Infof("Pull image %v:%v with user %v", imageName, tag, authInfo.Username)
pullImageReq.Auth = &runtimeapiv1alpha2.AuthConfig{
Username: authInfo.Username,
Password: authInfo.Password,
}
_, err = c.criImageClientV1alpha2.PullImage(ctx, pullImageReq)
if err == nil {
pipeW.CloseWithError(io.EOF)
return newImagePullStatusReader(pipeR), nil
}
klog.Warningf("Failed to pull image %v:%v, err %v", imageName, tag, err)
return nil, err
}
}

if err != nil {
return nil, err
}

// Anonymous pull
_, err = c.criImageClientV1alpha2.PullImage(ctx, pullImageReq)
if err != nil {
return nil, errors.Wrapf(err, "Failed to pull image reference %q", fullImageName)
}
pipeW.CloseWithError(io.EOF)
return newImagePullStatusReader(pipeR), nil
}

// ListImages implements ImageService.ListImages using V1alpha2 CRI client.
func (c *commonCRIImageService) listImagesV1alpha2(ctx context.Context) ([]ImageInfo, error) {
listImagesReq := &runtimeapiv1alpha2.ListImagesRequest{}
listImagesResp, err := c.criImageClientV1alpha2.ListImages(ctx, listImagesReq)
if err != nil {
return nil, err
}
collection := make([]ImageInfo, 0, len(listImagesResp.GetImages()))
for _, img := range listImagesResp.GetImages() {
collection = append(collection, ImageInfo{
ID: img.GetId(),
RepoTags: img.GetRepoTags(),
RepoDigests: img.GetRepoDigests(),
Size: int64(img.GetSize_()),
})
}
return collection, nil
}
Loading

0 comments on commit 15ed547

Please sign in to comment.