diff --git a/pkg/daemon/criruntime/factory.go b/pkg/daemon/criruntime/factory.go index 5c60279c05..707c9fac0c 100644 --- a/pkg/daemon/criruntime/factory.go +++ b/pkg/daemon/criruntime/factory.go @@ -18,6 +18,7 @@ package criruntime import ( "context" + "flag" "fmt" "os" "time" @@ -35,6 +36,10 @@ const ( kubeRuntimeAPIVersion = "0.1.0" ) +var ( + CRISocketFileName = flag.String("socket-file", "", "The name of CRI socket file, and it should be in the mounted /hostvarrun directory.") +) + // Factory is the interface to get container and image runtime service type Factory interface { GetImageService() runtimeimage.ImageService @@ -48,7 +53,7 @@ const ( ContainerRuntimeDocker = "docker" ContainerRuntimeContainerd = "containerd" ContainerRuntimePouch = "pouch" - ContainerRuntimeCRIO = "cri-o" + ContainerRuntimeCommonCRI = "common-cri" ) type runtimeConfig struct { @@ -108,13 +113,13 @@ func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountMan klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err) continue } - case ContainerRuntimeCRIO: + case ContainerRuntimeCommonCRI: addr, _, err := kubeletutil.GetAddressAndDialer(cfg.runtimeRemoteURI) if err != nil { klog.Warningf("Failed to get address for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err) continue } - imageService, err = runtimeimage.NewCrioImageService(addr, accountManager) + imageService, err = runtimeimage.NewCRIImageService(addr, accountManager) if err != nil { klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err) continue @@ -168,9 +173,25 @@ func (f *factory) GetRuntimeServiceByName(runtimeName string) criapi.RuntimeServ return nil } -func detectRuntime(varRunPath string) []runtimeConfig { +func detectRuntime(varRunPath string) (cfgs []runtimeConfig) { var err error - var cfgs []runtimeConfig + + // firstly check if it is configured from flag + if CRISocketFileName != nil && len(*CRISocketFileName) > 0 { + filePath := fmt.Sprintf("%s/%s", varRunPath, *CRISocketFileName) + if _, err = os.Stat(filePath); err == nil { + cfgs = append(cfgs, runtimeConfig{ + runtimeType: ContainerRuntimeCommonCRI, + runtimeRemoteURI: fmt.Sprintf("unix://%s/%s", varRunPath, *CRISocketFileName), + }) + klog.Infof("Find configured CRI socket %s with given flag", filePath) + } else { + klog.Errorf("Failed to stat the CRI socket %s with given flag: %v", filePath, err) + } + return + } + + // if the flag is not set, then try to find runtime in the recognized types and paths. // pouch { @@ -224,9 +245,15 @@ func detectRuntime(varRunPath string) []runtimeConfig { // cri-o { + if _, err = os.Stat(fmt.Sprintf("%s/crio.sock", varRunPath)); err == nil { + cfgs = append(cfgs, runtimeConfig{ + runtimeType: ContainerRuntimeCommonCRI, + runtimeRemoteURI: fmt.Sprintf("unix://%s/crio.sock", varRunPath), + }) + } if _, err = os.Stat(fmt.Sprintf("%s/crio/crio.sock", varRunPath)); err == nil { cfgs = append(cfgs, runtimeConfig{ - runtimeType: ContainerRuntimeCRIO, + runtimeType: ContainerRuntimeCommonCRI, runtimeRemoteURI: fmt.Sprintf("unix://%s/crio/crio.sock", varRunPath), }) } diff --git a/pkg/daemon/criruntime/imageruntime/crio.go b/pkg/daemon/criruntime/imageruntime/cri.go similarity index 91% rename from pkg/daemon/criruntime/imageruntime/crio.go rename to pkg/daemon/criruntime/imageruntime/cri.go index 0d0dc151e7..ba317aad8c 100644 --- a/pkg/daemon/criruntime/imageruntime/crio.go +++ b/pkg/daemon/criruntime/imageruntime/cri.go @@ -30,8 +30,8 @@ import ( const maxMsgSize = 1024 * 1024 * 16 -// NewCrioImageService create a cri-o runtime -func NewCrioImageService(runtimeURI string, accountManager daemonutil.ImagePullAccountManager) (ImageService, error) { +// NewCRIImageService create a common CRI runtime +func NewCRIImageService(runtimeURI string, accountManager daemonutil.ImagePullAccountManager) (ImageService, error) { klog.V(3).InfoS("Connecting to image service", "endpoint", runtimeURI) addr, dialer, err := util.GetAddressAndDialer(runtimeURI) if err != nil { @@ -54,19 +54,19 @@ func NewCrioImageService(runtimeURI string, accountManager daemonutil.ImagePullA klog.V(2).InfoS("Using CRI v1 image API") } - return &crioImageService{ + return &commonCRIImageService{ accountManager: accountManager, criImageClient: imageClient, }, nil } -type crioImageService struct { +type commonCRIImageService struct { accountManager daemonutil.ImagePullAccountManager criImageClient runtimeapi.ImageServiceClient } // PullImage implements ImageService.PullImage. -func (c *crioImageService) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (ImagePullStatusReader, error) { +func (c *commonCRIImageService) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (ImagePullStatusReader, error) { registry := daemonutil.ParseRegistry(imageName) fullImageName := imageName + ":" + tag // Reader @@ -147,7 +147,7 @@ func (c *crioImageService) PullImage(ctx context.Context, imageName, tag string, } // ListImages implements ImageService.ListImages. -func (c *crioImageService) ListImages(ctx context.Context) ([]ImageInfo, error) { +func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, error) { listImagesReq := &runtimeapi.ListImagesRequest{} listImagesResp, err := c.criImageClient.ListImages(ctx, listImagesReq) if err != nil { diff --git a/pkg/daemon/imagepuller/imagepuller_worker.go b/pkg/daemon/imagepuller/imagepuller_worker.go index 3ddbba3a44..2d2169b0de 100644 --- a/pkg/daemon/imagepuller/imagepuller_worker.go +++ b/pkg/daemon/imagepuller/imagepuller_worker.go @@ -431,9 +431,36 @@ func (w *pullWorker) doPullImage(ctx context.Context, newStatus *appsv1alpha1.Im return nil } - statusReader, err := w.runtime.PullImage(ctx, w.name, tag, w.secrets) - if err != nil { - return err + // make it asynchronous for CRI runtime will block in pulling image + var statusReader runtimeimage.ImagePullStatusReader + pullChan := make(chan struct{}) + go func() { + statusReader, err = w.runtime.PullImage(ctx, w.name, tag, w.secrets) + close(pullChan) + }() + + closeStatusReader := func() { + select { + case <-pullChan: + } + if statusReader != nil { + statusReader.Close() + } + } + + select { + case <-w.stopCh: + go closeStatusReader() + klog.V(2).Infof("Pulling image %v:%v is stopped.", w.name, tag) + return fmt.Errorf("pulling image %s:%s is stopped", w.name, tag) + case <-ctx.Done(): + go closeStatusReader() + klog.V(2).Infof("Pulling image %s:%s is canceled", w.name, tag) + return fmt.Errorf("pulling image %s:%s is canceled", w.name, tag) + case <-pullChan: + if err != nil { + return err + } } defer statusReader.Close()