Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support common CRI runtime based on cri-o impl #936

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions pkg/daemon/criruntime/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package criruntime

import (
"context"
"flag"
"fmt"
"os"
"time"
Expand All @@ -35,6 +36,10 @@ const (
kubeRuntimeAPIVersion = "0.1.0"
)

var (
CRISocketFileName = flag.String("socket-file", "", "The name of CRI socket file, and it should be in the mounted /hostvarrun directory.")
)

// Factory is the interface to get container and image runtime service
type Factory interface {
GetImageService() runtimeimage.ImageService
Expand All @@ -48,7 +53,7 @@ const (
ContainerRuntimeDocker = "docker"
ContainerRuntimeContainerd = "containerd"
ContainerRuntimePouch = "pouch"
ContainerRuntimeCRIO = "cri-o"
ContainerRuntimeCommonCRI = "common-cri"
)

type runtimeConfig struct {
Expand Down Expand Up @@ -108,13 +113,13 @@ func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountMan
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
case ContainerRuntimeCRIO:
case ContainerRuntimeCommonCRI:
addr, _, err := kubeletutil.GetAddressAndDialer(cfg.runtimeRemoteURI)
if err != nil {
klog.Warningf("Failed to get address for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
imageService, err = runtimeimage.NewCrioImageService(addr, accountManager)
imageService, err = runtimeimage.NewCRIImageService(addr, accountManager)
if err != nil {
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
Expand Down Expand Up @@ -168,9 +173,25 @@ func (f *factory) GetRuntimeServiceByName(runtimeName string) criapi.RuntimeServ
return nil
}

func detectRuntime(varRunPath string) []runtimeConfig {
func detectRuntime(varRunPath string) (cfgs []runtimeConfig) {
var err error
var cfgs []runtimeConfig

// firstly check if it is configured from flag
if CRISocketFileName != nil && len(*CRISocketFileName) > 0 {
filePath := fmt.Sprintf("%s/%s", varRunPath, *CRISocketFileName)
if _, err = os.Stat(filePath); err == nil {
cfgs = append(cfgs, runtimeConfig{
runtimeType: ContainerRuntimeCommonCRI,
runtimeRemoteURI: fmt.Sprintf("unix://%s/%s", varRunPath, *CRISocketFileName),
})
klog.Infof("Find configured CRI socket %s with given flag", filePath)
} else {
klog.Errorf("Failed to stat the CRI socket %s with given flag: %v", filePath, err)
}
return
}

// if the flag is not set, then try to find runtime in the recognized types and paths.

// pouch
{
Expand Down Expand Up @@ -224,9 +245,15 @@ func detectRuntime(varRunPath string) []runtimeConfig {

// cri-o
{
if _, err = os.Stat(fmt.Sprintf("%s/crio.sock", varRunPath)); err == nil {
cfgs = append(cfgs, runtimeConfig{
runtimeType: ContainerRuntimeCommonCRI,
runtimeRemoteURI: fmt.Sprintf("unix://%s/crio.sock", varRunPath),
})
}
if _, err = os.Stat(fmt.Sprintf("%s/crio/crio.sock", varRunPath)); err == nil {
cfgs = append(cfgs, runtimeConfig{
runtimeType: ContainerRuntimeCRIO,
runtimeType: ContainerRuntimeCommonCRI,
runtimeRemoteURI: fmt.Sprintf("unix://%s/crio/crio.sock", varRunPath),
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (

const maxMsgSize = 1024 * 1024 * 16

// NewCrioImageService create a cri-o runtime
func NewCrioImageService(runtimeURI string, accountManager daemonutil.ImagePullAccountManager) (ImageService, error) {
// NewCRIImageService create a common CRI runtime
func NewCRIImageService(runtimeURI string, accountManager daemonutil.ImagePullAccountManager) (ImageService, error) {
klog.V(3).InfoS("Connecting to image service", "endpoint", runtimeURI)
addr, dialer, err := util.GetAddressAndDialer(runtimeURI)
if err != nil {
Expand All @@ -54,19 +54,19 @@ func NewCrioImageService(runtimeURI string, accountManager daemonutil.ImagePullA
klog.V(2).InfoS("Using CRI v1 image API")
}

return &crioImageService{
return &commonCRIImageService{
accountManager: accountManager,
criImageClient: imageClient,
}, nil
}

type crioImageService struct {
type commonCRIImageService struct {
accountManager daemonutil.ImagePullAccountManager
criImageClient runtimeapi.ImageServiceClient
}

// PullImage implements ImageService.PullImage.
func (c *crioImageService) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (ImagePullStatusReader, error) {
func (c *commonCRIImageService) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (ImagePullStatusReader, error) {
registry := daemonutil.ParseRegistry(imageName)
fullImageName := imageName + ":" + tag
// Reader
Expand Down Expand Up @@ -147,7 +147,7 @@ func (c *crioImageService) PullImage(ctx context.Context, imageName, tag string,
}

// ListImages implements ImageService.ListImages.
func (c *crioImageService) ListImages(ctx context.Context) ([]ImageInfo, error) {
func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, error) {
listImagesReq := &runtimeapi.ListImagesRequest{}
listImagesResp, err := c.criImageClient.ListImages(ctx, listImagesReq)
if err != nil {
Expand Down
33 changes: 30 additions & 3 deletions pkg/daemon/imagepuller/imagepuller_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,36 @@ func (w *pullWorker) doPullImage(ctx context.Context, newStatus *appsv1alpha1.Im
return nil
}

statusReader, err := w.runtime.PullImage(ctx, w.name, tag, w.secrets)
if err != nil {
return err
// make it asynchronous for CRI runtime will block in pulling image
var statusReader runtimeimage.ImagePullStatusReader
pullChan := make(chan struct{})
go func() {
statusReader, err = w.runtime.PullImage(ctx, w.name, tag, w.secrets)
close(pullChan)
}()

closeStatusReader := func() {
select {
case <-pullChan:
}
if statusReader != nil {
statusReader.Close()
}
}

select {
case <-w.stopCh:
go closeStatusReader()
klog.V(2).Infof("Pulling image %v:%v is stopped.", w.name, tag)
return fmt.Errorf("pulling image %s:%s is stopped", w.name, tag)
case <-ctx.Done():
go closeStatusReader()
klog.V(2).Infof("Pulling image %s:%s is canceled", w.name, tag)
return fmt.Errorf("pulling image %s:%s is canceled", w.name, tag)
case <-pullChan:
if err != nil {
return err
}
}
defer statusReader.Close()

Expand Down