Skip to content

Commit

Permalink
Block osmount incase unmount is still in progress (#3388) (#3410)
Browse files Browse the repository at this point in the history
  • Loading branch information
0sewa0 authored Jul 4, 2024
1 parent 9bb3aa1 commit 715810c
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 73 deletions.
29 changes: 18 additions & 11 deletions pkg/controllers/csi/driver/volumes/host/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package hostvolumes

import (
"context"
goerrors "errors"
"fmt"
"os"

Expand Down Expand Up @@ -62,37 +63,43 @@ func (publisher *HostVolumePublisher) PublishVolume(ctx context.Context, volumeC
}

if osMount != nil && osMount.DeletedAt.Valid {
osMount.VolumeMeta = metadata.VolumeMeta{ID: volumeCfg.VolumeID, PodName: volumeCfg.PodName}
osMount.VolumeMetaID = volumeCfg.VolumeID
osMount.TenantConfigUID = tenantConfig.UID // necessary so that in some edge-cases(incorrect/messy uninstall) the previous(already soft-deleted) TenantConfig can be removed
osMount.VolumeMeta = metadata.VolumeMeta{
ID: volumeCfg.VolumeID,
PodName: volumeCfg.PodName,
}
osMount.TenantConfig = *tenantConfig

if err := publisher.mountOneAgent(osMount, volumeCfg); err != nil {
return nil, status.Error(codes.Internal, "failed to mount OSMount: "+err.Error())
}

osMount, err = publisher.db.RestoreOSMount(osMount)
_, err = publisher.db.RestoreOSMount(osMount)
if err != nil {
return nil, status.Error(codes.Internal, "failed to restore OSMount: "+err.Error())
}

return &csi.NodePublishVolumeResponse{}, nil
}

if osMount == nil && errors.Is(err, gorm.ErrRecordNotFound) {
osMount := metadata.OSMount{
VolumeMeta: metadata.VolumeMeta{ID: volumeCfg.VolumeID, PodName: volumeCfg.PodName},
VolumeMetaID: volumeCfg.VolumeID,
TenantUUID: tenantConfig.TenantUUID,
Location: publisher.path.OsAgentDir(tenantConfig.TenantUUID),
MountAttempts: 0,
TenantConfigUID: tenantConfig.UID,
VolumeMeta: metadata.VolumeMeta{ID: volumeCfg.VolumeID, PodName: volumeCfg.PodName},
VolumeMetaID: volumeCfg.VolumeID,
TenantUUID: tenantConfig.TenantUUID,
Location: publisher.path.OsAgentDir(tenantConfig.TenantUUID),
MountAttempts: 0,
TenantConfig: *tenantConfig,
}

if err := publisher.mountOneAgent(&osMount, volumeCfg); err != nil {
return nil, status.Error(codes.Internal, "failed to mount OSMount: "+err.Error())
}

if err := publisher.db.CreateOSMount(&osMount); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to insert osmount to database. info: %v err: %s", osMount, err.Error()))
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to insert OSMount to database. info: %v err: %s", osMount, err.Error()))
}
} else {
return &csi.NodePublishVolumeResponse{}, goerrors.New("previous OSMount is yet to be unmounted, there can be only 1 OSMount per tenant per node, blocking until unmount") // don't want to have the stacktrace here, it just pollutes the logs
}

return &csi.NodePublishVolumeResponse{}, nil
Expand Down
32 changes: 26 additions & 6 deletions pkg/controllers/csi/driver/volumes/host/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,57 @@ const (
)

func TestPublishVolume(t *testing.T) {
t.Run(`ready dynakube`, func(t *testing.T) {
ctx := context.Background()

t.Run("ready dynakube", func(t *testing.T) {
mounter := mount.NewFakeMounter([]mount.MountPoint{})
publisher := newPublisherForTesting(mounter)

mockDynakube(t, &publisher)

response, err := publisher.PublishVolume(context.Background(), createTestVolumeConfig())
response, err := publisher.PublishVolume(ctx, createTestVolumeConfig())

require.NoError(t, err)
assert.NotNil(t, response)
assert.NotEmpty(t, mounter.MountPoints)
assertReferencesForPublishedVolume(t, &publisher, mounter)
})

t.Run(`not ready dynakube`, func(t *testing.T) {
t.Run("not ready dynakube", func(t *testing.T) {
mounter := mount.NewFakeMounter([]mount.MountPoint{})
publisher := newPublisherForTesting(mounter)

mockDynakubeWithoutVersion(t, &publisher)

response, err := publisher.PublishVolume(context.Background(), createTestVolumeConfig())
response, err := publisher.PublishVolume(ctx, createTestVolumeConfig())

require.NoError(t, err)
assert.NotNil(t, response)
assert.NotEmpty(t, mounter.MountPoints)
assertReferencesForPublishedVolume(t, &publisher, mounter)
})

t.Run("publish volume when previous OSMount not yet unmounted (upgrade scenario) => error", func(t *testing.T) {
mounter := mount.NewFakeMounter([]mount.MountPoint{})
publisher := newPublisherForTesting(mounter)

mockDynakube(t, &publisher)

response, err := publisher.PublishVolume(ctx, createTestVolumeConfig())

require.NoError(t, err)
assert.NotNil(t, response)
assert.NotEmpty(t, mounter.MountPoints)
assertReferencesForPublishedVolume(t, &publisher, mounter)

response, err = publisher.PublishVolume(ctx, createTestVolumeConfig())
require.Error(t, err)
assert.NotNil(t, response)
})
}

func TestUnpublishVolume(t *testing.T) {
t.Run(`valid metadata`, func(t *testing.T) {
t.Run("valid metadata", func(t *testing.T) {
mounter := mount.NewFakeMounter([]mount.MountPoint{
{Path: testTargetPath},
})
Expand All @@ -66,7 +86,7 @@ func TestUnpublishVolume(t *testing.T) {
assertReferencesForUnpublishedVolume(t, &publisher)
})

t.Run(`invalid metadata`, func(t *testing.T) {
t.Run("invalid metadata", func(t *testing.T) {
mounter := mount.NewFakeMounter([]mount.MountPoint{
{Path: testTargetPath},
})
Expand Down
17 changes: 16 additions & 1 deletion pkg/controllers/csi/gc/binaries.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,23 @@ func (gc *CSIGarbageCollector) runBinaryGarbageCollection() {
continue
}

isNotMounted, err := gc.isNotMounted(gc.mounter, codeModule.Location)
if err != nil {
log.Info("failed to determine if AppMount is still mounted", "location", codeModule.Location, "version", codeModule.Version, "err", err.Error())

continue
}

if !isNotMounted {
log.Info("AppMount is still mounted", "location", codeModule.Location, "version", codeModule.Version)

continue
}

log.Info("cleaning up orphaned codemodule binary", "version", codeModule.Version, "location", codeModule.Location)
removeUnusedVersion(fs, codeModule.Location)

err := gc.db.PurgeCodeModule(&metadata.CodeModule{Version: codeModule.Version})
err = gc.db.PurgeCodeModule(&metadata.CodeModule{Version: codeModule.Version})
if err != nil {
log.Error(err, "failed to delete codemodule database entry")

Expand All @@ -48,6 +61,8 @@ func removeUnusedVersion(fs *afero.Afero, binaryPath string) {
foldersRemovedMetric.Inc()
reclaimedMemoryMetric.Add(float64(size))
}

log.Info("removed outdate CodeModule binary", "location", binaryPath)
}

func dirSize(fs *afero.Afero, path string) (int64, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/csi/gc/binaries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/utils/mount"
)

const (
Expand Down Expand Up @@ -102,12 +103,15 @@ func NewMockGarbageCollector() *CSIGarbageCollector {
path: metadata.PathResolver{RootDir: testRootDir},
time: timeprovider.New(),
maxUnmountedVolumeAge: defaultMaxUnmountedCsiVolumeAge,
mounter: mount.NewFakeMounter([]mount.MountPoint{}),
isNotMounted: mockIsNotMounted(map[string]error{}),
}
}

func (gc *CSIGarbageCollector) mockUnusedVersions(versions ...string) {
_ = gc.fs.Mkdir(testBinaryDir, 0770)

gc.isNotMounted = mockIsNotMounted(map[string]error{})
for _, version := range versions {
gc.db.(metadata.Access).CreateCodeModule(&metadata.CodeModule{Version: version, Location: filepath.Join(testBinaryDir, version)})
_, _ = gc.fs.Create(filepath.Join(testBinaryDir, version))
Expand Down
73 changes: 51 additions & 22 deletions pkg/controllers/csi/gc/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,28 @@ import (
"github.com/Dynatrace/dynatrace-operator/pkg/controllers/csi/metadata"
"github.com/Dynatrace/dynatrace-operator/pkg/util/timeprovider"
"github.com/spf13/afero"
"k8s.io/utils/mount"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// CSIGarbageCollector removes unused and outdated agent versions
type CSIGarbageCollector struct {
apiReader client.Reader
fs afero.Fs
db metadata.Cleaner
time *timeprovider.Provider
apiReader client.Reader
fs afero.Fs
db metadata.Cleaner
mounter mount.Interface
time *timeprovider.Provider
isNotMounted mountChecker

path metadata.PathResolver

maxUnmountedVolumeAge time.Duration
}

// necessary for mocking, as the MounterMock will use the os package
type mountChecker func(mounter mount.Interface, file string) (bool, error)

var _ reconcile.Reconciler = (*CSIGarbageCollector)(nil)

const (
Expand All @@ -40,6 +46,8 @@ func NewCSIGarbageCollector(apiReader client.Reader, opts dtcsi.CSIOptions, db m
db: db,
path: metadata.PathResolver{RootDir: opts.RootDir},
time: timeprovider.New(),
mounter: mount.New(""),
isNotMounted: mount.IsNotMountPoint,
maxUnmountedVolumeAge: determineMaxUnmountedVolumeAge(os.Getenv(maxUnmountedCsiVolumeAgeEnv)),
}
}
Expand Down Expand Up @@ -67,28 +75,11 @@ func (gc *CSIGarbageCollector) Reconcile(ctx context.Context, request reconcile.

gc.runUnmountedVolumeGarbageCollection(tenantConfig.TenantUUID)

osMounts, err := gc.db.ListDeletedOSMounts()
err := gc.runOSMountGarbageCollection(tenantConfig)
if err != nil {
continue
}

for _, osm := range osMounts {
if !gc.time.Now().Time.After(osm.DeletedAt.Time.Add(safeRemovalThreshold)) {
log.Info("skipping recently removed os-mount", "location", osm.Location)

continue
}

if osm.TenantConfigUID == tenantConfig.UID {
dir, _ := afero.ReadDir(gc.fs, osm.Location)
for _, d := range dir {
gc.fs.RemoveAll(path.Join([]string{osm.Location, d.Name()}...))
}

gc.db.PurgeOSMount(&osm)
}
}

err = gc.db.PurgeTenantConfig(&tenantConfig)
if err != nil {
log.Info("failed to remove the soft deleted tenant-config entry, will try again", "name", tenantConfig.Name)
Expand All @@ -103,3 +94,41 @@ func (gc *CSIGarbageCollector) Reconcile(ctx context.Context, request reconcile.

return reconcile.Result{RequeueAfter: dtcsi.LongRequeueDuration}, nil
}

func (gc *CSIGarbageCollector) runOSMountGarbageCollection(tenantConfig metadata.TenantConfig) error {
osMounts, err := gc.db.ListDeletedOSMounts()
if err != nil {
return err
}

for _, osm := range osMounts {
if !gc.time.Now().Time.After(osm.DeletedAt.Time.Add(safeRemovalThreshold)) {
log.Info("skipping recently removed os-mount", "location", osm.Location)

continue
}

if osm.TenantConfig.UID == tenantConfig.UID {
isNotMounted, err := gc.isNotMounted(gc.mounter, osm.Location)
if err != nil {
log.Info("failed to determine if OSMount is still mounted", "location", osm.Location, "tenantConfig", osm.TenantConfig.Name, "err", err.Error())

continue
}

if !isNotMounted {
log.Info("OSMount is still mounted", "location", osm.Location, "tenantConfig", osm.TenantConfig.Name)

continue
}

dir, _ := afero.ReadDir(gc.fs, osm.Location)
for _, d := range dir {
gc.fs.RemoveAll(path.Join([]string{osm.Location, d.Name()}...))
log.Info("removed outdate contents from OSMount folder", "location", osm.Location)
}
}
}

return nil
}
28 changes: 25 additions & 3 deletions pkg/controllers/csi/gc/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/mount"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand All @@ -32,13 +33,34 @@ func TestReconcile(t *testing.T) {
},
}
gc := CSIGarbageCollector{
apiReader: fake.NewClient(&dynakube),
fs: afero.NewMemMapFs(),
db: metadata.FakeMemoryDB(),
apiReader: fake.NewClient(&dynakube),
fs: afero.NewMemMapFs(),
db: metadata.FakeMemoryDB(),
mounter: mount.NewFakeMounter([]mount.MountPoint{}),
isNotMounted: mockIsNotMounted(map[string]error{}),
}
result, err := gc.Reconcile(context.Background(), reconcile.Request{NamespacedName: types.NamespacedName{Name: dynakube.Name}})

require.NoError(t, err)
assert.Equal(t, reconcile.Result{RequeueAfter: dtcsi.LongRequeueDuration}, result)
})
}

// mockIsNotMounted is rather confusing because of the double negation.
// you can pass in a map of filepaths, each path will be considered as mounted if corresponding error value is nil. (so returns false)
// if the filepath was not provided in the map, then the path is considered as not mounted. (so returns true)
// if an error was provided for a filepath in the map, then that path will cause the return of that error.
func mockIsNotMounted(files map[string]error) mountChecker {
return func(mounter mount.Interface, file string) (bool, error) {
err, ok := files[file]
if !ok {
return true, nil // unknown path => not mounted, no mocked error
}

if err == nil {
return false, nil // known path => mounted, no mocked error
}

return false, err // mocked error for path
}
}
Loading

0 comments on commit 715810c

Please sign in to comment.