Skip to content

Commit

Permalink
Support common CRI runtime based on cri-o impl (openkruise#936)
Browse files Browse the repository at this point in the history
  • Loading branch information
FillZpp authored Mar 21, 2022
1 parent d604c2d commit 5753084
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 15 deletions.
39 changes: 33 additions & 6 deletions pkg/daemon/criruntime/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package criruntime

import (
"context"
"flag"
"fmt"
"os"
"time"
Expand All @@ -35,6 +36,10 @@ const (
kubeRuntimeAPIVersion = "0.1.0"
)

var (
CRISocketFileName = flag.String("socket-file", "", "The name of CRI socket file, and it should be in the mounted /hostvarrun directory.")
)

// Factory is the interface to get container and image runtime service
type Factory interface {
GetImageService() runtimeimage.ImageService
Expand All @@ -48,7 +53,7 @@ const (
ContainerRuntimeDocker = "docker"
ContainerRuntimeContainerd = "containerd"
ContainerRuntimePouch = "pouch"
ContainerRuntimeCRIO = "cri-o"
ContainerRuntimeCommonCRI = "common-cri"
)

type runtimeConfig struct {
Expand Down Expand Up @@ -108,13 +113,13 @@ func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountMan
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
case ContainerRuntimeCRIO:
case ContainerRuntimeCommonCRI:
addr, _, err := kubeletutil.GetAddressAndDialer(cfg.runtimeRemoteURI)
if err != nil {
klog.Warningf("Failed to get address for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
imageService, err = runtimeimage.NewCrioImageService(addr, accountManager)
imageService, err = runtimeimage.NewCRIImageService(addr, accountManager)
if err != nil {
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
Expand Down Expand Up @@ -168,9 +173,25 @@ func (f *factory) GetRuntimeServiceByName(runtimeName string) criapi.RuntimeServ
return nil
}

func detectRuntime(varRunPath string) []runtimeConfig {
func detectRuntime(varRunPath string) (cfgs []runtimeConfig) {
var err error
var cfgs []runtimeConfig

// firstly check if it is configured from flag
if CRISocketFileName != nil && len(*CRISocketFileName) > 0 {
filePath := fmt.Sprintf("%s/%s", varRunPath, *CRISocketFileName)
if _, err = os.Stat(filePath); err == nil {
cfgs = append(cfgs, runtimeConfig{
runtimeType: ContainerRuntimeCommonCRI,
runtimeRemoteURI: fmt.Sprintf("unix://%s/%s", varRunPath, *CRISocketFileName),
})
klog.Infof("Find configured CRI socket %s with given flag", filePath)
} else {
klog.Errorf("Failed to stat the CRI socket %s with given flag: %v", filePath, err)
}
return
}

// if the flag is not set, then try to find runtime in the recognized types and paths.

// pouch
{
Expand Down Expand Up @@ -224,9 +245,15 @@ func detectRuntime(varRunPath string) []runtimeConfig {

// cri-o
{
if _, err = os.Stat(fmt.Sprintf("%s/crio.sock", varRunPath)); err == nil {
cfgs = append(cfgs, runtimeConfig{
runtimeType: ContainerRuntimeCommonCRI,
runtimeRemoteURI: fmt.Sprintf("unix://%s/crio.sock", varRunPath),
})
}
if _, err = os.Stat(fmt.Sprintf("%s/crio/crio.sock", varRunPath)); err == nil {
cfgs = append(cfgs, runtimeConfig{
runtimeType: ContainerRuntimeCRIO,
runtimeType: ContainerRuntimeCommonCRI,
runtimeRemoteURI: fmt.Sprintf("unix://%s/crio/crio.sock", varRunPath),
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (

const maxMsgSize = 1024 * 1024 * 16

// NewCrioImageService create a cri-o runtime
func NewCrioImageService(runtimeURI string, accountManager daemonutil.ImagePullAccountManager) (ImageService, error) {
// NewCRIImageService create a common CRI runtime
func NewCRIImageService(runtimeURI string, accountManager daemonutil.ImagePullAccountManager) (ImageService, error) {
klog.V(3).InfoS("Connecting to image service", "endpoint", runtimeURI)
addr, dialer, err := util.GetAddressAndDialer(runtimeURI)
if err != nil {
Expand All @@ -54,19 +54,19 @@ func NewCrioImageService(runtimeURI string, accountManager daemonutil.ImagePullA
klog.V(2).InfoS("Using CRI v1 image API")
}

return &crioImageService{
return &commonCRIImageService{
accountManager: accountManager,
criImageClient: imageClient,
}, nil
}

type crioImageService struct {
type commonCRIImageService struct {
accountManager daemonutil.ImagePullAccountManager
criImageClient runtimeapi.ImageServiceClient
}

// PullImage implements ImageService.PullImage.
func (c *crioImageService) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (ImagePullStatusReader, error) {
func (c *commonCRIImageService) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (ImagePullStatusReader, error) {
registry := daemonutil.ParseRegistry(imageName)
fullImageName := imageName + ":" + tag
// Reader
Expand Down Expand Up @@ -147,7 +147,7 @@ func (c *crioImageService) PullImage(ctx context.Context, imageName, tag string,
}

// ListImages implements ImageService.ListImages.
func (c *crioImageService) ListImages(ctx context.Context) ([]ImageInfo, error) {
func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, error) {
listImagesReq := &runtimeapi.ListImagesRequest{}
listImagesResp, err := c.criImageClient.ListImages(ctx, listImagesReq)
if err != nil {
Expand Down
33 changes: 30 additions & 3 deletions pkg/daemon/imagepuller/imagepuller_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,36 @@ func (w *pullWorker) doPullImage(ctx context.Context, newStatus *appsv1alpha1.Im
return nil
}

statusReader, err := w.runtime.PullImage(ctx, w.name, tag, w.secrets)
if err != nil {
return err
// make it asynchronous for CRI runtime will block in pulling image
var statusReader runtimeimage.ImagePullStatusReader
pullChan := make(chan struct{})
go func() {
statusReader, err = w.runtime.PullImage(ctx, w.name, tag, w.secrets)
close(pullChan)
}()

closeStatusReader := func() {
select {
case <-pullChan:
}
if statusReader != nil {
statusReader.Close()
}
}

select {
case <-w.stopCh:
go closeStatusReader()
klog.V(2).Infof("Pulling image %v:%v is stopped.", w.name, tag)
return fmt.Errorf("pulling image %s:%s is stopped", w.name, tag)
case <-ctx.Done():
go closeStatusReader()
klog.V(2).Infof("Pulling image %s:%s is canceled", w.name, tag)
return fmt.Errorf("pulling image %s:%s is canceled", w.name, tag)
case <-pullChan:
if err != nil {
return err
}
}
defer statusReader.Close()

Expand Down

0 comments on commit 5753084

Please sign in to comment.