diff --git a/.github/workflows/containerd-async.yaml b/.github/workflows/containerd-async.yaml new file mode 100644 index 00000000..bf8906d2 --- /dev/null +++ b/.github/workflows/containerd-async.yaml @@ -0,0 +1,39 @@ +name: containerd-async-11mins +on: + push: + branches: [main] + pull_request: + branches: [main] + workflow_dispatch: +jobs: + integration: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Start a kind cluster with containerd + uses: helm/kind-action@v1.9.0 + with: + cluster_name: kind-${{ github.run_id }} + kubectl_version: "v1.25.2" + config: ./hack/ci/containerd-cluster-conf.yaml + - name: Install private registry + run: ./hack/ci/setup_private_registry.sh + - name: Build image + run: ./hack/ci/build.sh + - name: Set image version + run: | + echo "VALUE_FILE=charts/warm-metal-csi-driver/values.yaml" >> "$GITHUB_ENV" + echo "IMAGE_TAG=$(git rev-parse --short HEAD)" >> "$GITHUB_ENV" + echo "HELM_NAME=wm-csi-integration-tests" >> "$GITHUB_ENV" + - name: Install the CSI Driver + run: | + helm install ${HELM_NAME} charts/warm-metal-csi-driver -n kube-system \ + -f ${VALUE_FILE} \ + --set csiPlugin.image.tag=${IMAGE_TAG} \ + --set enableAsyncPull=true \ + --wait \ + --debug + - name: Run integration Tests + run: ./hack/ci/test.sh + - name: Uninstall the CSI Driver + run: helm uninstall -n kube-system ${HELM_NAME} --wait diff --git a/Makefile b/Makefile index c10cca6d..c6670b1e 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -VERSION ?= v1.1.0 +VERSION ?= v1.2.0 IMAGE_BUILDER ?= docker IMAGE_BUILD_CMD ?= buildx diff --git a/charts/warm-metal-csi-driver/Chart.yaml b/charts/warm-metal-csi-driver/Chart.yaml index c9418412..773d69fc 100644 --- a/charts/warm-metal-csi-driver/Chart.yaml +++ b/charts/warm-metal-csi-driver/Chart.yaml @@ -15,9 +15,9 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.1.0 +version: 1.2.0 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. -appVersion: v1.1.0 +appVersion: v1.2.0 diff --git a/charts/warm-metal-csi-driver/templates/nodeplugin.yaml b/charts/warm-metal-csi-driver/templates/nodeplugin.yaml index a65aaddd..f83988df 100644 --- a/charts/warm-metal-csi-driver/templates/nodeplugin.yaml +++ b/charts/warm-metal-csi-driver/templates/nodeplugin.yaml @@ -63,8 +63,8 @@ spec: {{- if .Values.enableDaemonImageCredentialCache }} - --enable-daemon-image-credential-cache {{- end }} - {{- if .Values.enableAsyncPullMount }} - - --async-pull-mount=true + {{- if .Values.enableAsyncPull }} + - --async-pull-timeout={{ .Values.asyncPullTimeout }} {{- end }} - "-v={{ .Values.logLevel }}" - "--mode=node" diff --git a/charts/warm-metal-csi-driver/values.yaml b/charts/warm-metal-csi-driver/values.yaml index cf577ff6..a9c0b873 100644 --- a/charts/warm-metal-csi-driver/values.yaml +++ b/charts/warm-metal-csi-driver/values.yaml @@ -5,7 +5,8 @@ kubeletRoot: /var/lib/kubelet snapshotRoot: /var/lib/containerd/io.containerd.snapshotter.v1.overlayfs logLevel: 4 enableDaemonImageCredentialCache: -enableAsyncPullMount: false +enableAsyncPull: false +asyncPullTimeout: "10m" pullImageSecretForDaemonset: csiPlugin: diff --git a/cmd/plugin/main.go b/cmd/plugin/main.go index 4e2994f2..7ee86e1e 100644 --- a/cmd/plugin/main.go +++ b/cmd/plugin/main.go @@ -51,8 +51,8 @@ var ( enableCache = flag.Bool("enable-daemon-image-credential-cache", true, "Whether to save contents of imagepullsecrets of the daemon ServiceAccount in memory. "+ "If set to false, secrets will be fetched from the API server on every image pull.") - asyncImagePullMount = flag.Bool("async-pull-mount", false, - "Whether to pull images asynchronously (helps prevent timeout for larger images)") + asyncImagePullTimeout = flag.Duration("async-pull-timeout", 0, + "If positive, specifies duration allotted for async image pulls as measured from pull start time. If zero, negative, less than 30s, or omitted, the caller's timeout (usually kubelet: 2m) is used instead of this value. (additional time helps prevent timeout for larger images or slower image pull conditions)") watcherResyncPeriod = flag.Duration("watcher-resync-period", 30*time.Minute, "The resync period of the pvc watcher.") mode = flag.String("mode", "", "The mode of the driver. Valid values are: node, controller") nodePluginSA = flag.String("node-plugin-sa", "csi-image-warm-metal", "The name of the ServiceAccount used by the node plugin.") @@ -129,7 +129,7 @@ func main() { server.Start(*endpoint, NewIdentityServer(driverVersion), nil, - NewNodeServer(driver, mounter, criClient, secretStore, *asyncImagePullMount)) + NewNodeServer(driver, mounter, criClient, secretStore, *asyncImagePullTimeout)) case controllerMode: watcher, err := watcher.New(context.Background(), *watcherResyncPeriod) if err != nil { diff --git a/cmd/plugin/node_server.go b/cmd/plugin/node_server.go index 109eb5c3..a1c3dff7 100644 --- a/cmd/plugin/node_server.go +++ b/cmd/plugin/node_server.go @@ -4,15 +4,14 @@ import ( "context" "os" "strings" + "time" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/containerd/containerd/reference/docker" - "github.com/google/uuid" "github.com/warm-metal/container-image-csi-driver/pkg/backend" "github.com/warm-metal/container-image-csi-driver/pkg/metrics" - "github.com/warm-metal/container-image-csi-driver/pkg/mountexecutor" - "github.com/warm-metal/container-image-csi-driver/pkg/mountstatus" - "github.com/warm-metal/container-image-csi-driver/pkg/pullexecutor" + "github.com/warm-metal/container-image-csi-driver/pkg/remoteimage" + "github.com/warm-metal/container-image-csi-driver/pkg/remoteimageasync" "github.com/warm-metal/container-image-csi-driver/pkg/secret" csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common" "google.golang.org/grpc/codes" @@ -31,36 +30,36 @@ const ( type ImagePullStatus int -func NewNodeServer(driver *csicommon.CSIDriver, mounter backend.Mounter, imageSvc cri.ImageServiceClient, secretStore secret.Store, asyncImagePullMount bool) *NodeServer { - return &NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(driver), - mounter: mounter, - secretStore: secretStore, - asyncImagePullMount: asyncImagePullMount, - mountExecutor: mountexecutor.NewMountExecutor(&mountexecutor.MountExecutorOptions{ - AsyncMount: asyncImagePullMount, - Mounter: mounter, - }), - pullExecutor: pullexecutor.NewPullExecutor(&pullexecutor.PullExecutorOptions{ - AsyncPull: asyncImagePullMount, - ImageServiceClient: imageSvc, - SecretStore: secretStore, - Mounter: mounter, - }), - } +func NewNodeServer(driver *csicommon.CSIDriver, mounter backend.Mounter, imageSvc cri.ImageServiceClient, secretStore secret.Store, asyncImagePullTimeout time.Duration) *NodeServer { + ns := NodeServer{ + DefaultNodeServer: csicommon.NewDefaultNodeServer(driver), + mounter: mounter, + imageSvc: imageSvc, + secretStore: secretStore, + asyncImagePullTimeout: asyncImagePullTimeout, + asyncImagePuller: nil, + } + if asyncImagePullTimeout >= time.Duration(30*time.Second) { + klog.Infof("Starting node server in Async mode with %v timeout", asyncImagePullTimeout) + ns.asyncImagePuller = remoteimageasync.StartAsyncPuller(context.TODO(), 100) + } else { + klog.Info("Starting node server in Sync mode") + ns.asyncImagePullTimeout = 0 // set to default value + } + return &ns } type NodeServer struct { *csicommon.DefaultNodeServer - mounter backend.Mounter - secretStore secret.Store - asyncImagePullMount bool - mountExecutor *mountexecutor.MountExecutor - pullExecutor *pullexecutor.PullExecutor + mounter backend.Mounter + imageSvc cri.ImageServiceClient + secretStore secret.Store + asyncImagePullTimeout time.Duration + asyncImagePuller remoteimageasync.AsyncPuller } func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (resp *csi.NodePublishVolumeResponse, err error) { - valuesLogger := klog.LoggerWithValues(klog.NewKlogr(), "pod-name", req.VolumeContext["pod-name"], "namespace", req.VolumeContext["namespace"], "uid", req.VolumeContext["uid"], "request-id", uuid.NewString()) + valuesLogger := klog.LoggerWithValues(klog.NewKlogr(), "pod-name", req.VolumeContext["pod-name"], "namespace", req.VolumeContext["namespace"], "uid", req.VolumeContext["uid"]) valuesLogger.Info("Incoming NodePublishVolume request", "request string", req.String()) if len(req.VolumeId) == 0 { err = status.Error(codes.InvalidArgument, "VolumeId is missing") @@ -122,56 +121,59 @@ func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishV image = req.VolumeContext[ctxKeyImage] } - namedRef, err := docker.ParseDockerRef(image) - if err != nil { - klog.Errorf("unable to normalize image %q: %s", image, err) - return - } - pullAlways := strings.ToLower(req.VolumeContext[ctxKeyPullAlways]) == "true" - po := &pullexecutor.PullOptions{ - Context: ctx, - NamedRef: namedRef, - PullAlways: pullAlways, - Image: image, - PullSecrets: req.Secrets, - Logger: valuesLogger, - } - - if e := n.pullExecutor.StartPulling(po); e != nil { - err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, e) + keyring, err := n.secretStore.GetDockerKeyring(ctx, req.Secrets) + if err != nil { + err = status.Errorf(codes.Aborted, "unable to fetch keyring: %s", err) return } - if e := n.pullExecutor.WaitForPull(po); e != nil { - err = status.Errorf(codes.DeadlineExceeded, e.Error()) + namedRef, err := docker.ParseDockerRef(image) + if err != nil { + klog.Errorf("unable to normalize image %q: %s", image, err) return } - if mountstatus.Get(req.VolumeId) == mountstatus.Mounted { - return &csi.NodePublishVolumeResponse{}, nil - } - - o := &mountexecutor.MountOptions{ - Context: ctx, - NamedRef: namedRef, - VolumeId: req.VolumeId, - TargetPath: req.TargetPath, - VolumeCapability: req.VolumeCapability, - ReadOnly: req.Readonly, - Logger: valuesLogger, + //NOTE: we are relying on n.mounter.ImageExists() to return false when + // a first-time pull is in progress, else this logic may not be + // correct. should test this. + if pullAlways || !n.mounter.ImageExists(ctx, namedRef) { + klog.Errorf("pull image %q", image) + puller := remoteimage.NewPuller(n.imageSvc, namedRef, keyring) + + if n.asyncImagePuller != nil { + var session *remoteimageasync.PullSession + session, err = n.asyncImagePuller.StartPull(image, puller, n.asyncImagePullTimeout) + if err != nil { + err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err) + metrics.OperationErrorsCount.WithLabelValues("pull-async-start").Inc() + return + } + if err = n.asyncImagePuller.WaitForPull(session, ctx); err != nil { + err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err) + metrics.OperationErrorsCount.WithLabelValues("pull-async-wait").Inc() + return + } + } else { + if err = puller.Pull(ctx); err != nil { + err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err) + metrics.OperationErrorsCount.WithLabelValues("pull-sync-call").Inc() + return + } + } } - if e := n.mountExecutor.StartMounting(o); e != nil { - err = status.Error(codes.Internal, e.Error()) + ro := req.Readonly || + req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY || + req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY + if err = n.mounter.Mount(ctx, req.VolumeId, backend.MountTarget(req.TargetPath), namedRef, ro); err != nil { + err = status.Error(codes.Internal, err.Error()) + metrics.OperationErrorsCount.WithLabelValues("mount").Inc() return } - if e := n.mountExecutor.WaitForMount(o); e != nil { - err = status.Errorf(codes.DeadlineExceeded, e.Error()) - return - } + valuesLogger.Info("Successfully completed NodePublishVolume request", "request string", req.String()) return &csi.NodePublishVolumeResponse{}, nil } @@ -194,17 +196,11 @@ func (n NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl } if err = n.mounter.Unmount(ctx, req.VolumeId, backend.MountTarget(req.TargetPath)); err != nil { - // TODO(vadasambar): move this to mountexecutor once mountexecutor has `StartUnmounting` function - metrics.OperationErrorsCount.WithLabelValues("StartUnmounting").Inc() + metrics.OperationErrorsCount.WithLabelValues("unmount").Inc() err = status.Error(codes.Internal, err.Error()) return } - // Clear the mountstatus since the volume has been unmounted - // Not doing this will make mount not work properly if the same volume is - // attempted to mount twice - mountstatus.Delete(req.VolumeId) - return &csi.NodeUnpublishVolumeResponse{}, nil } diff --git a/cmd/plugin/node_server_test.go b/cmd/plugin/node_server_test.go index 5db8ac49..e3e165a5 100644 --- a/cmd/plugin/node_server_test.go +++ b/cmd/plugin/node_server_test.go @@ -41,7 +41,7 @@ func TestNodePublishVolumeAsync(t *testing.T) { driver := csicommon.NewCSIDriver(driverName, driverVersion, "fake-node") assert.NotNil(t, driver) - asyncImagePulls := true + asyncImagePulls := 15 * time.Minute //TODO: determine intended value for this in the context of this test ns := NewNodeServer(driver, mounter, criClient, &testSecretStore{}, asyncImagePulls) // based on kubelet's csi mounter plugin code @@ -168,7 +168,7 @@ func TestNodePublishVolumeSync(t *testing.T) { driver := csicommon.NewCSIDriver(driverName, driverVersion, "fake-node") assert.NotNil(t, driver) - asyncImagePulls := false + asyncImagePulls := 0 * time.Minute //TODO: determine intended value for this in the context of this test ns := NewNodeServer(driver, mounter, criClient, &testSecretStore{}, asyncImagePulls) // based on kubelet's csi mounter plugin code @@ -300,7 +300,7 @@ func TestMetrics(t *testing.T) { driver := csicommon.NewCSIDriver(driverName, driverVersion, "fake-node") assert.NotNil(t, driver) - asyncImagePulls := true + asyncImagePulls := 15 * time.Minute //TODO: determine intended value for this in the context of this test ns := NewNodeServer(driver, mounter, criClient, &testSecretStore{}, asyncImagePulls) // based on kubelet's csi mounter plugin code @@ -437,7 +437,7 @@ func TestMetrics(t *testing.T) { assert.NoError(t, err) respBody := string(b1) assert.Contains(t, respBody, metrics.ImagePullTimeKey) - assert.Contains(t, respBody, metrics.ImageMountTimeKey) + assert.Contains(t, respBody, metrics.ImagePullTimeHist) assert.Contains(t, respBody, metrics.OperationErrorsCountKey) // give some time before stopping the server diff --git a/go.mod b/go.mod index d93901d2..b9bb8e54 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 github.com/spf13/pflag v1.0.5 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.8.4 github.com/warm-metal/csi-drivers v0.5.0-alpha.0.0.20210404173852-9ec9cb097dd2 golang.org/x/net v0.0.0-20221004154528-8021a29435af google.golang.org/grpc v1.50.0 diff --git a/go.sum b/go.sum index 3ca18272..92a359c1 100644 --- a/go.sum +++ b/go.sum @@ -826,8 +826,9 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index c4104ad8..19d22b68 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -8,30 +8,36 @@ import ( "k8s.io/klog/v2" ) -const Async = "async" -const Sync = "sync" const ImagePullTimeKey = "pull_duration_seconds" -const ImageMountTimeKey = "mount_duration_seconds" +const ImagePullTimeHistKey = "pull_duration_seconds_hist" +const ImagePullSizeKey = "pull_size_bytes" const OperationErrorsCountKey = "operation_errors_total" -var ImagePullTime = prometheus.NewHistogramVec( +var ImagePullTimeHist = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: "warm_metal", - Name: ImagePullTimeKey, + Name: ImagePullTimeHistKey, Help: "The time it took to pull an image", - Buckets: []float64{0, 1, 5, 10, 15, 30, 60, 120, 180}, + Buckets: []float64{1, 5, 10, 15, 30, 60, 120, 300, 600, 900}, }, - []string{"operation_type"}, + []string{"error"}, ) - -var ImageMountTime = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ +var ImagePullTime = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Subsystem: "warm_metal", - Name: ImageMountTimeKey, + Name: ImagePullTimeKey, Help: "The time it took to mount an image", - Buckets: []float64{0, 1, 5, 10, 15, 30, 60, 120, 180}, }, - []string{"operation_type"}, + []string{"image", "error"}, +) + +var ImagePullSizeBytes = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: "warm_metal", + Name: ImagePullSizeKey, + Help: "Size (in bytes) of pulled image", + }, + []string{"image"}, ) var OperationErrorsCount = prometheus.NewCounterVec( @@ -46,7 +52,8 @@ var OperationErrorsCount = prometheus.NewCounterVec( func RegisterMetrics() *prometheus.Registry { reg := prometheus.NewRegistry() reg.MustRegister(ImagePullTime) - reg.MustRegister(ImageMountTime) + reg.MustRegister(ImagePullTimeHist) + reg.MustRegister(ImagePullSizeBytes) reg.MustRegister(OperationErrorsCount) return reg @@ -59,3 +66,11 @@ func StartMetricsServer(reg *prometheus.Registry) { klog.Fatal(http.ListenAndServe(":8080", nil)) }() } + +func BoolToString(t bool) string { + if t { + return "true" + } else { + return "false" + } +} diff --git a/pkg/mountexecutor/mountexecutor.go b/pkg/mountexecutor/mountexecutor.go deleted file mode 100644 index 118fb563..00000000 --- a/pkg/mountexecutor/mountexecutor.go +++ /dev/null @@ -1,143 +0,0 @@ -package mountexecutor - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/containerd/containerd/reference/docker" - "github.com/warm-metal/container-image-csi-driver/pkg/backend" - "github.com/warm-metal/container-image-csi-driver/pkg/metrics" - "github.com/warm-metal/container-image-csi-driver/pkg/mountstatus" - "github.com/warm-metal/container-image-csi-driver/pkg/pullstatus" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog/v2" -) - -const ( - mountPollTimeInterval = 100 * time.Millisecond - mountPollTimeout = 2 * time.Minute - mountCtxTimeout = 10 * time.Minute -) - -// MountExecutorOptions are options passed to mount executor -type MountExecutorOptions struct { - AsyncMount bool - Mounter backend.Mounter -} - -// MountOptions are options for a single mount request -type MountOptions struct { - // Context here is only valid for synchronous mounts - Context context.Context - NamedRef docker.Named - VolumeId string - TargetPath string - VolumeCapability *csi.VolumeCapability - ReadOnly bool - Logger klog.Logger -} - -// MountExecutor executes mount -type MountExecutor struct { - asyncMount bool - mutex *sync.Mutex - mounter backend.Mounter - asyncErrs map[docker.Named]error -} - -// NewMountExecutor initializes a new mount executor -func NewMountExecutor(o *MountExecutorOptions) *MountExecutor { - return &MountExecutor{ - asyncMount: o.AsyncMount, - mutex: &sync.Mutex{}, - mounter: o.Mounter, - } -} - -// StartMounting starts the mounting -func (m *MountExecutor) StartMounting(o *MountOptions) error { - o.Logger.Info("Mounting image", "image", o.NamedRef.Name()) - if pullstatus.Get(o.NamedRef) != pullstatus.Pulled || mountstatus.Get(o.TargetPath) == mountstatus.StillMounting { - o.Logger.Info("Could not mount image because image hasn't finshed pulling or volume is still mounting", - "image", o.NamedRef.Name(), - "pull-status", pullstatus.Get(o.NamedRef), - "mount-status", mountstatus.Get(o.TargetPath)) - return nil - } - - ro := o.ReadOnly || - o.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY || - o.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY - - if !m.asyncMount { - mountstatus.Update(o.TargetPath, mountstatus.StillMounting) - startTime := time.Now() - if err := m.mounter.Mount(o.Context, o.VolumeId, backend.MountTarget(o.TargetPath), o.NamedRef, ro); err != nil { - o.Logger.Error(err, "mount error") - metrics.OperationErrorsCount.WithLabelValues("StartMounting").Inc() - mountstatus.Update(o.TargetPath, mountstatus.Errored) - return err - } - elapsed := time.Since(startTime) - metrics.ImageMountTime.WithLabelValues(metrics.Sync).Observe(elapsed.Seconds()) - o.Logger.Info("Finished mounting", "image", o.NamedRef.Name(), "mount-duration", elapsed) - mountstatus.Update(o.TargetPath, mountstatus.Mounted) - return nil - } - - go func() { - m.mutex.Lock() - defer m.mutex.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), mountCtxTimeout) - defer cancel() - - mountstatus.Update(o.TargetPath, mountstatus.StillMounting) - startTime := time.Now() - if err := m.mounter.Mount(ctx, o.VolumeId, backend.MountTarget(o.TargetPath), o.NamedRef, ro); err != nil { - o.Logger.Error(err, "mount error") - metrics.OperationErrorsCount.WithLabelValues("StartMounting").Inc() - mountstatus.Update(o.TargetPath, mountstatus.Errored) - m.asyncErrs[o.NamedRef] = fmt.Errorf("err: %v: %v", err, m.asyncErrs[o.NamedRef]) - return - } - elapsed := time.Since(startTime) - metrics.ImageMountTime.WithLabelValues(metrics.Async).Observe(elapsed.Seconds()) - o.Logger.Info("Finished mounting", "image", o.NamedRef.Name(), "mount-duration", elapsed) - mountstatus.Update(o.TargetPath, mountstatus.Mounted) - }() - - return nil -} - -// WaitForMount waits for the volume to get mounted -func (m *MountExecutor) WaitForMount(o *MountOptions) error { - if pullstatus.Get(o.NamedRef) != pullstatus.Pulled { - return nil - } - - if !m.asyncMount { - return nil - } - - mountCondFn := func() (done bool, err error) { - if mountstatus.Get(o.TargetPath) == mountstatus.Mounted { - return true, nil - } - if m.asyncErrs[o.NamedRef] != nil { - return false, m.asyncErrs[o.NamedRef] - } - return false, nil - } - - if err := wait.PollImmediate( - mountPollTimeInterval, - mountPollTimeout, - mountCondFn); err != nil { - return fmt.Errorf("waited too long to mount the image: %v", err) - } - - return nil -} diff --git a/pkg/mountstatus/mountstatus.go b/pkg/mountstatus/mountstatus.go deleted file mode 100644 index 0b9095fc..00000000 --- a/pkg/mountstatus/mountstatus.go +++ /dev/null @@ -1,63 +0,0 @@ -package mountstatus - -import ( - "sync" -) - -// ImagePullStatus represents mount status of an image -type ImageMountStatus int - -// https://stackoverflow.com/questions/14426366/what-is-an-idiomatic-way-of-representing-enums-in-go -const ( - // StatusNotFound means there has been no attempt to mount the image - StatusNotFound ImageMountStatus = -1 - // StillMounting means the image is still being mounted as a volume - StillMounting ImageMountStatus = iota - // Mounted means the image has been mounted as a volume - Mounted - // Errored means there was an error during image mount - Errored -) - -// ImageMountStatusRecorder records the status of image mounts -type ImageMountStatusRecorder struct { - status map[string]ImageMountStatus - mutex sync.Mutex -} - -var i ImageMountStatusRecorder - -func init() { - i = ImageMountStatusRecorder{ - status: make(map[string]ImageMountStatus), - mutex: sync.Mutex{}, - } -} - -// Update updates the mount status of an image -func Update(volumeId string, status ImageMountStatus) { - i.mutex.Lock() - defer i.mutex.Unlock() - - i.status[volumeId] = status -} - -// Delete deletes the mount status of an image -func Delete(volumeId string) { - i.mutex.Lock() - defer i.mutex.Unlock() - - delete(i.status, volumeId) -} - -// Get gets the mount status of an image -func Get(volumeId string) ImageMountStatus { - i.mutex.Lock() - defer i.mutex.Unlock() - - if _, ok := i.status[volumeId]; !ok { - return StatusNotFound - } - - return i.status[volumeId] -} diff --git a/pkg/pullexecutor/pullexecutor.go b/pkg/pullexecutor/pullexecutor.go deleted file mode 100644 index dbedd65f..00000000 --- a/pkg/pullexecutor/pullexecutor.go +++ /dev/null @@ -1,166 +0,0 @@ -package pullexecutor - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/containerd/containerd/reference/docker" - "github.com/pkg/errors" - "github.com/warm-metal/container-image-csi-driver/pkg/backend" - "github.com/warm-metal/container-image-csi-driver/pkg/metrics" - "github.com/warm-metal/container-image-csi-driver/pkg/pullstatus" - "github.com/warm-metal/container-image-csi-driver/pkg/remoteimage" - "github.com/warm-metal/container-image-csi-driver/pkg/secret" - "k8s.io/apimachinery/pkg/util/wait" - cri "k8s.io/cri-api/pkg/apis/runtime/v1" - "k8s.io/klog/v2" -) - -const ( - pullPollTimeInterval = 100 * time.Millisecond - pullPollTimeout = 2 * time.Minute - pullCtxTimeout = 10 * time.Minute -) - -// PullExecutorOptions are the options passed to the pull executor -type PullExecutorOptions struct { - AsyncPull bool - ImageServiceClient cri.ImageServiceClient - SecretStore secret.Store - Mounter backend.Mounter -} - -// PullOptions are the options for a single pull request -type PullOptions struct { - // Context here is only valid for synchronous mounts - Context context.Context - NamedRef docker.Named - PullAlways bool - PullSecrets map[string]string - Image string - Logger klog.Logger -} - -// PullExecutor executes the pulls -type PullExecutor struct { - asyncPull bool - imageSvcClient cri.ImageServiceClient - mutex *sync.Mutex - asyncErrs map[docker.Named]error - secretStore secret.Store - mounter backend.Mounter -} - -// NewPullExecutor initializes a new pull executor object -func NewPullExecutor(o *PullExecutorOptions) *PullExecutor { - return &PullExecutor{ - asyncPull: o.AsyncPull, - mutex: &sync.Mutex{}, - imageSvcClient: o.ImageServiceClient, - secretStore: o.SecretStore, - mounter: o.Mounter, - asyncErrs: make(map[docker.Named]error), - } -} - -// StartPulling starts pulling the image -func (m *PullExecutor) StartPulling(o *PullOptions) error { - keyring, err := m.secretStore.GetDockerKeyring(o.Context, o.PullSecrets) - if err != nil { - return errors.Errorf("unable to fetch keyring: %s", err) - } - - if !m.asyncPull { - puller := remoteimage.NewPuller(m.imageSvcClient, o.NamedRef, keyring) - shouldPull := o.PullAlways || !m.mounter.ImageExists(o.Context, o.NamedRef) - if shouldPull { - o.Logger.Info("Pulling image", "image", o.Image) - pullstatus.Update(o.NamedRef, pullstatus.StillPulling) - startTime := time.Now() - if err = puller.Pull(o.Context); err != nil { - pullstatus.Update(o.NamedRef, pullstatus.Errored) - metrics.OperationErrorsCount.WithLabelValues("StartPulling").Inc() - o.Logger.Error(err, "Unable to pull image", "image", o.NamedRef) - return errors.Errorf("unable to pull image %q: %s", o.NamedRef, err) - } - elapsed := time.Since(startTime) - metrics.ImagePullTime.WithLabelValues(metrics.Sync).Observe(elapsed.Seconds()) - size := puller.ImageSize(o.Context) - o.Logger.Info("Finished pulling image", "image", o.Image, "pull-duration", elapsed, "image-size", fmt.Sprintf("%.2f MiB", float64(size)/(1024.0*1024.0))) - } - pullstatus.Update(o.NamedRef, pullstatus.Pulled) - return nil - } - - if pullstatus.Get(o.NamedRef) == pullstatus.Pulled || - pullstatus.Get(o.NamedRef) == pullstatus.StillPulling { - return nil - } - - go func() { - if pullstatus.Get(o.NamedRef) == pullstatus.StatusNotFound { - m.mutex.Lock() - defer m.mutex.Unlock() - c, cancel := context.WithTimeout(context.Background(), pullCtxTimeout) - defer cancel() - - if pullstatus.Get(o.NamedRef) == pullstatus.StillPulling || - pullstatus.Get(o.NamedRef) == pullstatus.Pulled { - return - } - - puller := remoteimage.NewPuller(m.imageSvcClient, o.NamedRef, keyring) - shouldPull := o.PullAlways || !m.mounter.ImageExists(o.Context, o.NamedRef) - if shouldPull { - o.Logger.Info("Pulling image asynchronously", "image", o.Image) - pullstatus.Update(o.NamedRef, pullstatus.StillPulling) - startTime := time.Now() - - if err = puller.Pull(c); err != nil { - pullstatus.Update(o.NamedRef, pullstatus.Errored) - metrics.OperationErrorsCount.WithLabelValues("StartPulling").Inc() - o.Logger.Error(err, "Unable to pull image", "image", o.Image) - m.asyncErrs[o.NamedRef] = fmt.Errorf("unable to pull image %q: %s", o.Image, err) - return - } - elapsed := time.Since(startTime) - metrics.ImagePullTime.WithLabelValues(metrics.Async).Observe(elapsed.Seconds()) - size := puller.ImageSize(o.Context) - o.Logger.Info("Finished pulling image", "image", o.Image, "pull-duration", elapsed, "image-size", fmt.Sprintf("%.2f MiB", float64(size)/(1024.0*1024.0))) - - } - pullstatus.Update(o.NamedRef, pullstatus.Pulled) - } - }() - - return nil -} - -// WaitForPull waits until the image pull succeeds or errors or timeout is exceeded -func (m *PullExecutor) WaitForPull(o *PullOptions) error { - if !m.asyncPull { - return nil - } - - condFn := func() (done bool, err error) { - if pullstatus.Get(o.NamedRef) == pullstatus.Pulled { - return true, nil - } - - if m.asyncErrs[o.NamedRef] != nil { - return false, m.asyncErrs[o.NamedRef] - } - return false, nil - } - - if err := wait.PollImmediate( - pullPollTimeInterval, - pullPollTimeout, - condFn); err != nil { - return errors.Errorf("waited too long to download the image: %v", err) - } - - return nil -} diff --git a/pkg/pullstatus/pullstatus.go b/pkg/pullstatus/pullstatus.go deleted file mode 100644 index 1a9a6729..00000000 --- a/pkg/pullstatus/pullstatus.go +++ /dev/null @@ -1,65 +0,0 @@ -package pullstatus - -import ( - "sync" - - "github.com/containerd/containerd/reference/docker" -) - -// ImagePullStatus represents pull status of an image -type ImagePullStatus int - -// https://stackoverflow.com/questions/14426366/what-is-an-idiomatic-way-of-representing-enums-in-go -const ( - // StatusNotFound means there has been no attempt to pull the image - StatusNotFound ImagePullStatus = -1 - // StillPulling means the image is still being pulled - StillPulling ImagePullStatus = iota - // Pulled means the image has been pulled - Pulled - // Errored means there was an error during image pull - Errored -) - -// ImagePullStatusRecorder records the status of image pulls -type ImagePullStatusRecorder struct { - status map[docker.Named]ImagePullStatus - mutex sync.Mutex -} - -var i ImagePullStatusRecorder - -func init() { - i = ImagePullStatusRecorder{ - status: make(map[docker.Named]ImagePullStatus), - mutex: sync.Mutex{}, - } -} - -// Update updates the pull status of an image -func Update(imageRef docker.Named, status ImagePullStatus) { - i.mutex.Lock() - defer i.mutex.Unlock() - - i.status[imageRef] = status -} - -// Delete deletes the pull status of an image -func Delete(imageRef docker.Named) { - i.mutex.Lock() - defer i.mutex.Unlock() - - delete(i.status, imageRef) -} - -// Get gets the pull status of an image -func Get(imageRef docker.Named) ImagePullStatus { - i.mutex.Lock() - defer i.mutex.Unlock() - - if _, ok := i.status[imageRef]; !ok { - return StatusNotFound - } - - return i.status[imageRef] -} diff --git a/pkg/remoteimage/pull.go b/pkg/remoteimage/pull.go index 7ca29d5a..282d57b5 100644 --- a/pkg/remoteimage/pull.go +++ b/pkg/remoteimage/pull.go @@ -2,16 +2,22 @@ package remoteimage import ( "context" + "fmt" + "time" "github.com/containerd/containerd/reference/docker" + "github.com/warm-metal/container-image-csi-driver/pkg/metrics" utilerrors "k8s.io/apimachinery/pkg/util/errors" cri "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/credentialprovider" ) type Puller interface { Pull(context.Context) error - ImageSize(context.Context) int + ImageWithTag() string + ImageWithoutTag() string + ImageSize(context.Context) (int, error) } func NewPuller(imageSvc cri.ImageServiceClient, image docker.Named, @@ -29,25 +35,85 @@ type puller struct { keyring credentialprovider.DockerKeyring } +func (p puller) ImageWithTag() string { + return p.image.String() +} + +func (p puller) ImageWithoutTag() string { + return p.image.Name() +} + // Returns the compressed size of the image that was pulled in bytes // see https://github.com/containerd/containerd/issues/9261 -func (p puller) ImageSize(ctx context.Context) int { - imageSpec := &cri.ImageSpec{Image: p.image.String()} - imageStatusResponse, _ := p.imageSvc.ImageStatus(ctx, &cri.ImageStatusRequest{ +func (p puller) ImageSize(ctx context.Context) (size int, err error) { + defer func() { + if err != nil { + klog.Errorf(err.Error()) + metrics.OperationErrorsCount.WithLabelValues("size-error").Inc() + } + }() + imageSpec := &cri.ImageSpec{Image: p.ImageWithTag()} + if imageStatusResponse, err := p.imageSvc.ImageStatus(ctx, &cri.ImageStatusRequest{ Image: imageSpec, - }) - return int(imageStatusResponse.Image.Size_) + }); err != nil { + size = 0 + err = fmt.Errorf("remoteimage.ImageSize(): call returned an error: %s", err.Error()) + return size, err + } else if imageStatusResponse == nil { + size = 0 + err = fmt.Errorf("remoteimage.ImageSize(): imageStatusResponse is nil") + return size, err + } else if imageStatusResponse.Image == nil { + size = 0 + err = fmt.Errorf("remoteimage.ImageSize(): imageStatusResponse.Image is nil") + return size, err + } else { + size = imageStatusResponse.Image.Size() + err = nil + return size, err + } } func (p puller) Pull(ctx context.Context) (err error) { - repo := p.image.Name() - imageSpec := &cri.ImageSpec{Image: p.image.String()} + startTime := time.Now() + defer func() { // must capture final value of "err" + elapsed := time.Since(startTime).Seconds() + // pull time metrics and logs + klog.Infof("remoteimage.Pull(): pulled %s in %d milliseconds", p.ImageWithTag(), int(1000*elapsed)) + metrics.ImagePullTimeHist.WithLabelValues(metrics.BoolToString(err != nil)).Observe(elapsed) + metrics.ImagePullTime.WithLabelValues(p.ImageWithTag(), metrics.BoolToString(err != nil)).Set(elapsed) + if err != nil { + metrics.OperationErrorsCount.WithLabelValues("pull-error").Inc() + } + go func() { + //TODO: this is a hack to ensure data is cleared in a reasonable time frame (after scrape) and does not build up. + time.Sleep(1 * time.Minute) + metrics.ImagePullTime.DeleteLabelValues(p.ImageWithTag(), metrics.BoolToString(err != nil)) + }() + // pull size metrics and logs + if err == nil { // only size if pull was successful + if size, err2 := p.ImageSize(ctx); err2 != nil { + // log entries and error counts emitted inside ImageSize() method + } else { // success + klog.Infof("remoteimage.Pull(): pulled %s with size of %d bytes", p.ImageWithTag(), size) + metrics.ImagePullSizeBytes.WithLabelValues(p.ImageWithTag()).Set(float64(size)) + go func() { + //TODO: this is a hack to ensure data is cleared in a reasonable time frame (after scrape) and does not build up. + time.Sleep(1 * time.Minute) + metrics.ImagePullSizeBytes.DeleteLabelValues(p.ImageWithTag()) + }() + } + } + }() + repo := p.ImageWithoutTag() + imageSpec := &cri.ImageSpec{Image: p.ImageWithTag()} creds, withCredentials := p.keyring.Lookup(repo) if !withCredentials { _, err = p.imageSvc.PullImage(ctx, &cri.PullImageRequest{ Image: imageSpec, }) + klog.V(2).Infof("remoteimage.Pull(no creds): pulling %s completed with err=%v", p.ImageWithTag(), err) return } @@ -68,11 +134,14 @@ func (p puller) Pull(ctx context.Context) (err error) { }) if err == nil { + klog.V(2).Infof("remoteimage.Pull(with creds): pulling %s completed with err==nil", p.ImageWithTag()) return } pullErrs = append(pullErrs, err) } - return utilerrors.NewAggregate(pullErrs) + err = utilerrors.NewAggregate(pullErrs) + klog.V(2).Infof("remoteimage.Pull(): completed with errors, len(pullErrs)=%d, aggErr=%s", len(pullErrs), err.Error()) + return } diff --git a/pkg/remoteimageasync/patterns_test.go b/pkg/remoteimageasync/patterns_test.go new file mode 100644 index 00000000..15d6af6f --- /dev/null +++ b/pkg/remoteimageasync/patterns_test.go @@ -0,0 +1,86 @@ +package remoteimageasync + +import ( + "fmt" + "strings" + "testing" + + "github.com/containerd/containerd/reference/docker" + "github.com/stretchr/testify/assert" + "github.com/warm-metal/container-image-csi-driver/pkg/remoteimage" +) + +// demonstrates session channel structure's pass-by-reference is appropriate +func TestChannelStructContent(t *testing.T) { + input1 := make(chan PullSession, 1) + val1 := PullSession{ + err: nil, + } + assert.Nil(t, val1.err) + input1 <- val1 + tmp1 := <-input1 + tmp1.err = fmt.Errorf("test1") + assert.NotNil(t, tmp1.err) + assert.Nil(t, val1.err, "pass by value does not update value") + + input2 := make(chan *PullSession, 1) + val2 := PullSession{ + err: nil, + } + assert.Nil(t, val2.err) + input2 <- &val2 + tmp2 := <-input2 + tmp2.err = fmt.Errorf("test2") + assert.NotNil(t, tmp2.err) + assert.NotNil(t, val2.err, "pass by reference does update value") +} + +// demonstrates logic used in remoteimageasync.StartPull() +func TestChannelClose(t *testing.T) { + input1 := make(chan interface{}, 5) + result := 0 + + select { + case input1 <- 0: + result = 1 + default: + result = -1 + } + assert.Equal(t, 1, result, "write should succeed") + + assert.Panics(t, func() { + close(input1) + select { + case input1 <- 0: + result = 2 + default: + result = -2 + } + }, "write should panic") + + var err error = nil + assert.NotPanics(t, func() { + defer func() { + if rec := recover(); rec != nil { + err = fmt.Errorf("recovered from %v", rec) + } + }() + select { + case input1 <- 0: + result = 3 + default: + result = -3 + } + }, "write should not panic") + assert.NotNil(t, err, "error should have been returned") + assert.Contains(t, err.Error(), "closed", "error should indicate channel closed") +} + +func TestNamedImageExtraction(t *testing.T) { + parsed, err := docker.ParseDockerRef(nonExistentImage) + assert.Nil(t, err, "parsing image name should succeed") + puller := remoteimage.NewPuller(nil, parsed, nil) + assert.Equal(t, nonExistentImage, puller.ImageWithTag(), "extracted value should match exactly %v", puller) + repo := strings.Split(nonExistentImage, ":")[0] + assert.Equal(t, repo, puller.ImageWithoutTag(), "extracted value should match exactly %v", puller) +} diff --git a/pkg/remoteimageasync/puller.go b/pkg/remoteimageasync/puller.go new file mode 100644 index 00000000..5e23d619 --- /dev/null +++ b/pkg/remoteimageasync/puller.go @@ -0,0 +1,60 @@ +package remoteimageasync + +import ( + "context" + "fmt" + "time" + + "github.com/warm-metal/container-image-csi-driver/pkg/metrics" + "k8s.io/klog/v2" +) + +// sessionChan and completedChan both closed here +func RunPullerLoop( + ctx context.Context, + sessionChan chan *PullSession, + completedFunc func(*PullSession), +) { + go func() { + <-ctx.Done() + close(sessionChan) // only close this once + }() + go func() { + for { + ses, ok := <-sessionChan // ctx not observed for shut down, this sleep breaks when sessionChan is closed + if !ok { // sessionChan closed, shut down loop + return + } + go func() { + klog.V(2).Infof("%s.RunPullerLoop(): asked to pull image %s with timeout %v\n", + prefix, ses.ImageWithTag(), ses.timeout) + ctxAsyncPullTimeoutOrShutdown, cancelDontCare := context.WithTimeout(ctx, ses.timeout) // combine session timeout and shut down signal into one + defer cancelDontCare() // IF we exit, this no longer matters. calling to satisfy linter. + pullStart := time.Now() + pullErr := ses.puller.Pull(ctxAsyncPullTimeoutOrShutdown) // the waiting happens here, not in the select + // update fields on session before declaring done + select { // no waiting here, cases check for reason we exited puller.Pull() + case <-ctx.Done(): // application shutting down + ses.isTimedOut = false + ses.err = fmt.Errorf("%s.RunPullerLoop(): shutting down", prefix) + klog.V(2).Infof(ses.err.Error()) + metrics.OperationErrorsCount.WithLabelValues("pull-async-shutdown").Inc() + case <-ctxAsyncPullTimeoutOrShutdown.Done(): // async pull timeout or shutdown + ses.isTimedOut = true + ses.err = fmt.Errorf("%s.RunPullerLoop(): async pull exceeded timeout of %v for image %s", prefix, ses.timeout, ses.ImageWithTag()) + klog.V(2).Infof(ses.err.Error()) + metrics.OperationErrorsCount.WithLabelValues("pull-async-timeout").Inc() + default: // completion: success or error + ses.isTimedOut = false + ses.err = pullErr + klog.V(2).Infof("%s.RunPullerLoop(): pull completed in %v for image %s with error=%v\n", prefix, time.Since(pullStart), ses.ImageWithTag(), ses.err) + if ses.err != nil { + metrics.OperationErrorsCount.WithLabelValues("pull-async-error").Inc() + } + } + close(ses.done) // signal done, all waiters should wake + completedFunc(ses) + }() + } + }() +} diff --git a/pkg/remoteimageasync/synchronizer.go b/pkg/remoteimageasync/synchronizer.go new file mode 100644 index 00000000..e76dd25c --- /dev/null +++ b/pkg/remoteimageasync/synchronizer.go @@ -0,0 +1,100 @@ +package remoteimageasync + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/warm-metal/container-image-csi-driver/pkg/remoteimage" + "k8s.io/klog/v2" +) + +// sessionChanDepth : 100 - must give lots of buffer to ensure no deadlock or dropped requests +func StartAsyncPuller(ctx context.Context, sessionChanDepth int) AsyncPuller { + klog.Infof("%s.StartAsyncPuller(): starting async puller", prefix) + sessionChan := make(chan *PullSession, sessionChanDepth) + async := getSynchronizer( + ctx, + sessionChan, + ) + completedFunc := func(ses *PullSession) { // remove session from session map (since no longer active for continuation) + async.mutex.Lock() + defer async.mutex.Unlock() + klog.V(2).Infof("%s.StartAsyncPuller(): clearing session for %s", prefix, ses.ImageWithTag()) + delete(async.sessionMap, ses.ImageWithTag()) // no-op if already deleted + } + RunPullerLoop(ctx, sessionChan, completedFunc) + klog.Infof("%s.StartAsyncPuller(): async puller is operational", prefix) + return async +} + +// channels are exposed for testing +func getSynchronizer( + ctx context.Context, + sessionChan chan *PullSession, +) synchronizer { + if cap(sessionChan) < 50 { + klog.Fatalf("%s.getSynchronizer(): session channel must have capacity to buffer events, minimum of 50 is required", prefix) + } + return synchronizer{ + sessionMap: make(map[string]*PullSession), + mutex: &sync.Mutex{}, + sessions: sessionChan, + ctx: ctx, + } +} + +func (s synchronizer) StartPull(image string, puller remoteimage.Puller, asyncPullTimeout time.Duration) (ses *PullSession, err error) { + klog.V(2).Infof("%s.StartPull(): start pull: asked to pull image %s", prefix, image) + s.mutex.Lock() // lock mutex, no blocking sends/receives inside mutex + defer s.mutex.Unlock() + ses, ok := s.sessionMap[image] // try get session + if !ok { // if no session, create session + ses = &PullSession{ + puller: puller, + timeout: asyncPullTimeout, + done: make(chan interface{}), + isTimedOut: false, + err: nil, + } + + defer func() { + if rec := recover(); rec != nil { // handle session write panic due to closed sessionChan + // override named return values + ses = nil + err = fmt.Errorf("%s.StartPull(): cannot create pull session for %s at this time, reason: %v", prefix, ses.ImageWithTag(), rec) + klog.V(2).Info(err.Error()) + } + }() + select { + case s.sessions <- ses: // start session, check for deadlock... possibility of panic but only during app shutdown where Puller has already ceased to operate, handle with defer/recover + klog.V(2).Infof("%s.StartPull(): new session created for %s with timeout %v", prefix, ses.ImageWithTag(), ses.timeout) + s.sessionMap[image] = ses // add session to map to allow continuation... only do this because was passed to puller via sessions channel + return ses, nil + default: // catch deadlock or throttling (they may look the same) + err := fmt.Errorf("%s.StartPull(): cannot create pull session for %s at this time, throttling or deadlock condition exists, retry if throttling", prefix, ses.ImageWithTag()) + klog.V(2).Info(err.Error()) + return nil, err + } + } else { + klog.V(2).Infof("%s.StartPull(): found open session for %s", prefix, ses.ImageWithTag()) + // return session and unlock + return ses, nil + } +} + +func (s synchronizer) WaitForPull(session *PullSession, callerTimeout context.Context) error { + klog.V(2).Infof("%s.WaitForPull(): starting to wait for image %s", prefix, session.ImageWithTag()) + defer klog.V(2).Infof("%s.WaitForPull(): exiting wait for image %s", prefix, session.ImageWithTag()) + select { + case <-session.done: // success or error (including session timeout and shutting down) + klog.V(2).Infof("%s.WaitForPull(): session completed with success or error for image %s, error=%v", prefix, session.ImageWithTag(), session.err) + return session.err + case <-callerTimeout.Done(): // caller timeout + err := fmt.Errorf("%s.WaitForPull(): this wait for image %s has timed out due to caller context cancellation, pull likely continues in the background", + prefix, session.ImageWithTag()) + klog.V(2).Info(err.Error()) + return err + } +} diff --git a/pkg/remoteimageasync/synchronizer_test.go b/pkg/remoteimageasync/synchronizer_test.go new file mode 100644 index 00000000..2014df6d --- /dev/null +++ b/pkg/remoteimageasync/synchronizer_test.go @@ -0,0 +1,231 @@ +package remoteimageasync + +import ( + "context" + "fmt" + "math/rand" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// this demonstrates that session errors are consistent across go routines +func TestAsyncPullErrorReturn(t *testing.T) { + ctx, dontCare := context.WithTimeout(context.TODO(), 5*time.Second) + defer dontCare() + puller := StartAsyncPuller(ctx, 100) + + err := pullImage(puller, "nginx:exists", 1, 5, 5) + assert.Nil(t, err, "no error should be returned for successful pull") + + err = pullImage(puller, nonExistentImage, 1, 5, 5) + assert.NotNil(t, err, "error should be returned for non-existent image") + + <-ctx.Done() +} + +// demonstrates pullerMock is functioning properly +// verifies parallelism +// checks correct image pulls completed withing set time (5s) +func TestPullDuration(t *testing.T) { + ctx, dontCare := context.WithTimeout(context.TODO(), 5*time.Second) // shut down execution + defer dontCare() + puller := StartAsyncPuller(ctx, 100) + var ops atomic.Int32 + + durations := []int{1, 2, 3, 4, 6, 7, 8} + + for _, dur := range durations { + go func(dur int) { + err := pullImage(puller, fmt.Sprintf("nginx:%v", dur), dur, 10, 10) + if err == nil { // rejects pull results that are returned due to shutting down puller loop, otherwise a race condition + ops.Add(1) + } + }(dur) + } + + <-ctx.Done() // stop waiting when context times out (shut down) + assert.Equal(t, 4, int(ops.Load()), "only 4 of %v should complete", len(durations)) +} + +// checks for call serialization +func TestParallelPull(t *testing.T) { + ctx, dontCare := context.WithTimeout(context.TODO(), 3*time.Second) + defer dontCare() + puller := StartAsyncPuller(ctx, 100) + var ops atomic.Int32 + + imgs := []int{2, 2, 2, 2, 2, 2, 2} + + for _, i := range imgs { + go func(i int) { + err := pullImage(puller, fmt.Sprintf("nginx:%v", i), i, 10, 10) + if err == nil { + ops.Add(1) + } + }(i) + } + + <-ctx.Done() + assert.Equal(t, len(imgs), int(ops.Load()), "all %v should complete", len(imgs)) +} + +// tests timeouts and eventual success of long image pull +func TestSerialResumedSessions(t *testing.T) { + ctx, dontCare := context.WithTimeout(context.TODO(), 6*time.Second) + defer dontCare() + puller := StartAsyncPuller(ctx, 100) + var success atomic.Int32 + var notSuccess atomic.Int32 + + // 3 states exist for each pull: running, success, error + pull := func(image string, pullSec, asyncTimeoutSec, callerTimeoutSec int) { + err := pullImage(puller, image, pullSec, asyncTimeoutSec, callerTimeoutSec) + if err == nil { + success.Add(1) + } else { + notSuccess.Add(1) + } + } + + // these are serial, not parallel. simulates kubelet retrying call to NodePublishVolume(). + pull("nginx:A", 5, 6, 1) // caller times out after 1s but pull continues asynchronously + pull("nginx:A", 5, 6, 1) // continues session but caller times out after 1s + pull("nginx:A", 5, 6, 1) // continues session but caller times out after 1s + pull("nginx:A", 5, 6, 1) // continues session but caller times out after 1s + assert.Equal(t, 0, int(success.Load()), "none should have finished yet") + assert.Equal(t, 4, int(notSuccess.Load()), "all should have errored so far") // needed because 3 states exist + + pull("nginx:A", 5, 6, 2) // succeed after 1s because 5s (pull time) has elapsed since session started + assert.Equal(t, 1, int(success.Load()), "1 should have finished") + assert.Equal(t, 4, int(notSuccess.Load()), "no new errors after previous") + + <-ctx.Done() +} + +// simulates multiple pods trying to mount same image +// this would result in parallel NodePublishVolume() calls to pull and mount same image +// demonstrates timeout and async pull continuation under that scenario +func TestParallelResumedSessions(t *testing.T) { + ctx, dontCare := context.WithTimeout(context.TODO(), 6*time.Second) + defer dontCare() + puller := StartAsyncPuller(ctx, 100) + var success atomic.Int32 + var notSuccess atomic.Int32 + + pull := func(image string, pullSec, asyncTimeoutSec, callerTimeoutSec int) { + err := pullImage(puller, image, pullSec, asyncTimeoutSec, callerTimeoutSec) + if err == nil { + success.Add(1) + } else { + notSuccess.Add(1) + } + } + + pull("nginx:A", 5, 6, 1) // caller times out after 1s, pull continues async + assert.Equal(t, 0, int(success.Load()), "none should have finished yet") + assert.Equal(t, 1, int(notSuccess.Load()), "all should have errored so far") // caller timeout error returned + + time.Sleep(1 * time.Second) + // time is now 2 sec into 5 sec pull + + // make parallel pull requests which would time out if not resuming session + go func() { pull("nginx:A", 5, 6, 4) }() // caller allows 4s but completes in 3s + go func() { pull("nginx:A", 5, 6, 4) }() // caller allows 4s but completes in 3s + go func() { pull("nginx:A", 5, 6, 4) }() // caller allows 4s but completes in 3s + assert.Equal(t, 0, int(success.Load()), "none should have finished yet") + assert.Equal(t, 1, int(notSuccess.Load()), "all should have errored so far") // 1 timed out, 3 in-flight blocked waiting + + time.Sleep(3100 * time.Millisecond) // all should have succeeded 100ms ago + + assert.Equal(t, 3, int(success.Load()), "3 resumed calls should have finished") + assert.Equal(t, 1, int(notSuccess.Load()), "no new errors") + + <-ctx.Done() +} + +// pullDurationSec: typically 5-60 seconds, containerd behavior (time actually required to pull image) +// asyncPullTimeoutSec: ~10m, the new logic allows async continuation of a pull (if enabled) +// callerTimeoutSec: kubelet hard coded to 2m +func pullImage(puller AsyncPuller, image string, pullDurationSec, asyncPullTimeoutSec, callerTimeoutSec int) error { + return pullImageRand(puller, image, pullDurationSec, pullDurationSec, asyncPullTimeoutSec, callerTimeoutSec) +} + +func pullImageRand(puller AsyncPuller, image string, pullDurationSecLow, pullDurationSecHigh, asyncPullTimeoutSec, callerTimeoutSec int) error { + pull := getPullerMockRand(image, pullDurationSecLow*1000, pullDurationSecHigh*1000) + session, err := puller.StartPull(image, pull, time.Duration(asyncPullTimeoutSec)*time.Second) + if err != nil { + return err + } + ctx, dontCare := context.WithTimeout(context.TODO(), time.Duration(callerTimeoutSec)*time.Second) + defer dontCare() + return puller.WaitForPull(session, ctx) +} + +type pullerMock struct { + image string + msDurationLow int + msDurationHigh int + size int // negative size indicates error should be returned +} + +func getPullerMock(image string, ms_duration int) pullerMock { + return getPullerMockRand(image, ms_duration, ms_duration) +} + +func getPullerMockRand(image string, ms_low, ms_high int) pullerMock { + return pullerMock{ + image: image, + msDurationLow: ms_low, + msDurationHigh: ms_high, + size: 0, + } +} + +// negative size indicates error should be returned +func (p *pullerMock) SetSize(size int) { + p.size = size +} + +func (p pullerMock) Pull(ctx context.Context) (err error) { + dur := time.Duration(p.msDurationLow) * time.Millisecond + if p.msDurationLow != p.msDurationHigh { + rand.Seed(time.Now().UnixNano()) // without seed, same sequence always returned + dur = time.Duration(p.msDurationLow+rand.Intn(p.msDurationHigh-p.msDurationLow)) * time.Millisecond + } + + fmt.Printf("pullerMock: starting to pull image %s\n", p.image) + if p.image == nonExistentImage { + err = fmt.Errorf("pullerMock: non-existent image specified, returning this error\n") + fmt.Println(err.Error()) + return err + } + select { + case <-time.After(dur): + fmt.Printf("pullerMock: finshed pulling image %s\n", p.image) + return nil + case <-ctx.Done(): + fmt.Printf("pullerMock: context cancelled\n") + return nil + } +} + +func (p pullerMock) ImageWithTag() string { + return p.image +} + +func (p pullerMock) ImageWithoutTag() string { + panic("Not implemented") +} + +func (p pullerMock) ImageSize(ctx context.Context) (int, error) { + if p.size < 0 { + return 0, fmt.Errorf("error occurred when checking image size") + } + return p.size, nil +} + +// this image is known to not exist and is used by integration tests for that purpose +const nonExistentImage = "docker.io/warmmetal/container-image-csi-driver-test:simple-fs-doesnt-exist" diff --git a/pkg/remoteimageasync/types.go b/pkg/remoteimageasync/types.go new file mode 100644 index 00000000..19c20658 --- /dev/null +++ b/pkg/remoteimageasync/types.go @@ -0,0 +1,38 @@ +package remoteimageasync + +import ( + "context" + "sync" + "time" + + "github.com/warm-metal/container-image-csi-driver/pkg/remoteimage" +) + +const prefix = "remoteimageasync" + +type PullSession struct { + puller remoteimage.Puller + timeout time.Duration // this is the session timeout, not the caller timeout + done chan interface{} // chan will block until result + isTimedOut bool + err error +} + +func (p PullSession) ImageWithTag() string { + return p.puller.ImageWithTag() +} + +type synchronizer struct { + sessionMap map[string]*PullSession // all interactions must be mutex'd + mutex *sync.Mutex // this exclusively protects the sessionMap + sessions chan *PullSession // pull activity occurs in puller Go routine when using async mode + ctx context.Context // top level application context +} + +// allows mocking/dependency injection +type AsyncPuller interface { + // returns session that is ready to wait on, or error + StartPull(image string, puller remoteimage.Puller, asyncPullTimeout time.Duration) (*PullSession, error) + // waits for session to time out or succeed + WaitForPull(session *PullSession, callerTimeout context.Context) error +} diff --git a/test/integration/metrics-manifests/rendered-test.yaml b/test/integration/metrics-manifests/rendered-test.yaml deleted file mode 100644 index 03d3d446..00000000 --- a/test/integration/metrics-manifests/rendered-test.yaml +++ /dev/null @@ -1,25 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: test-metrics -spec: - template: - metadata: - name: test-metrics - spec: - containers: - - name: test-metrics - # official curl iamge taken from https://github.com/curl/curl-container - image: quay.io/curl/curl:latest - command: - - /bin/sh - - -c - - | - (curl $IP:8080/metrics | grep warm_metal_pull_duration_seconds) && \ - (curl $IP:8080/metrics | grep warm_metal_mount_duration_seconds) && \ - (curl $IP:8080/metrics | grep warm_metal_operation_errors_total) - env: - - name: IP - value: 172.18.0.2 - restartPolicy: Never - backoffLimit: 0 diff --git a/test/integration/metrics-manifests/test.yaml b/test/integration/metrics-manifests/test.yaml index 75e90b01..33ba2124 100644 --- a/test/integration/metrics-manifests/test.yaml +++ b/test/integration/metrics-manifests/test.yaml @@ -16,7 +16,7 @@ spec: - -c - | (curl $IP:8080/metrics | grep warm_metal_pull_duration_seconds) && \ - (curl $IP:8080/metrics | grep warm_metal_mount_duration_seconds) && \ + (curl $IP:8080/metrics | grep warm_metal_pull_duration_seconds_hist) && \ (curl $IP:8080/metrics | grep warm_metal_operation_errors_total) env: - name: IP