Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: implement a global lock for cephfs encryption #4688

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path"
"strings"
"time"

cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/ceph/ceph-csi/internal/cephfs/mounter"
Expand All @@ -33,6 +34,9 @@ import (
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/ceph/ceph-csi/internal/util/radosmutex"
"github.com/ceph/ceph-csi/internal/util/radosmutex/retryoptions"
"github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
Expand All @@ -51,6 +55,10 @@ type NodeServer struct {
healthChecker hc.Manager
}

func volumeRadosMutexName(volumeID string) string {
return "rados-mutex-" + volumeID
}

func getCredentialsForVolume(
volOptions *store.VolumeOptions,
secrets map[string]string,
Expand Down Expand Up @@ -127,13 +135,72 @@ func maybeUnlockFileEncryption(
stagingTargetPath string,
volID fsutil.VolumeID,
) error {
if volOptions.IsEncrypted() {
log.DebugLog(ctx, "cephfs: unlocking fscrypt on volume %q path %s", volID, stagingTargetPath)

retryoptions := retryoptions.RetryOptions{
MaxAttempts: 20,
SleepDuration: 2000 * time.Microsecond,
}

lockName := volumeRadosMutexName(string(volID))

if volOptions.IsEncrypted() == false {
return nil
}

log.ErrorLog(ctx, "Creating lock for the following volume ID %s", lockName)

ioctx, err := volOptions.GetConnection().GetIoctx(volOptions.MetadataPool)
if err != nil {
log.ErrorLog(ctx, "failed to create RADOS ioctx: %s", err)

return err
}
defer ioctx.Destroy()

ioctx.SetNamespace(fsutil.RadosNamespace)
ioctxW := radoswrapper.NewIOContext(ioctx)

created, err := radosmutex.CreateOrAquireLock(
ctx,
ioctxW,
lockName,
"This is some pod here",
retryoptions,
)
if err != nil {
log.ErrorLog(ctx, "failed to aquire lock %s: %v", lockName, err)

return err
}

if created {
defer func() {
log.DebugLog(ctx, "Releasing following lock %s", lockName)

var deleted bool
deleted, err = radosmutex.ReleaseLock(
ctx,
ioctxW,
lockName,
"This is some pod here",
)

if err != nil {
log.ErrorLog(ctx, "failed to release following lock, this will lead to orphan lock %s: %v",
lockName, err)
}
if !deleted {
log.ErrorLog(ctx, "failed to release following lock, this will lead to orphan lock %s",
lockName)
}

}()

log.DebugLog(ctx, "cephfs: unlocking fscrypt on volume %q path %s", volID, stagingTargetPath)
return fscrypt.Unlock(ctx, volOptions.Encryption, stagingTargetPath, string(volID))
}

return nil
return fmt.Errorf("There is already one file system with name %s", string(volID))
}

// maybeInitializeFileEncryption initializes KMS and node specifics, if volContext enables encryption.
Expand Down
84 changes: 84 additions & 0 deletions internal/util/radosmutex/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright 2022 The Ceph-CSI Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package errors

import (
goerrors "errors"
"fmt"

"github.com/ceph/go-ceph/rados"
"golang.org/x/sys/unix"
)

// ErrObjectOutOfDate is an error returned by RADOS read/write ops whose
// rados_*_op_assert_version failed.
var ErrObjectOutOfDate = goerrors.New("object is out of date since the last time it was read, try again later")

// UnexpectedReadSize formats an error message for a failure due to bad read
// size.
func UnexpectedReadSize(expectedBytes, actualBytes int) error {
return fmt.Errorf("unexpected size read: expected %d bytes, got %d",
expectedBytes, actualBytes)
}

// UnknownObjectVersion formats an error message for a failure due to unknown
// reftracker object version.
func UnknownObjectVersion(unknownVersion uint32) error {
return fmt.Errorf("unknown reftracker version %d", unknownVersion)
}

// FailedObjectRead formats an error message for a failed RADOS read op.
func FailedObjectRead(cause error) error {
if cause != nil {
return fmt.Errorf("failed to read object: %w", TryRADOSAborted(cause))
}

return nil
}

// FailedObjectRead formats an error message for a failed RADOS read op.
func FailedObjectWrite(cause error) error {
if cause != nil {
return fmt.Errorf("failed to write object: %w", TryRADOSAborted(cause))
}

return nil
}

// TryRADOSAborted tries to extract rados_*_op_assert_version from opErr.
func TryRADOSAborted(opErr error) error {
if opErr == nil {
return nil
}

var radosOpErr rados.OperationError
if !goerrors.As(opErr, &radosOpErr) {
return opErr
}

errnoErr, ok := radosOpErr.OpError.(interface{ ErrorCode() int })
if !ok {
return opErr
}

errno := errnoErr.ErrorCode()
if errno == -int(unix.EOVERFLOW) || errno == -int(unix.ERANGE) {
return ErrObjectOutOfDate
}

return nil
}
97 changes: 97 additions & 0 deletions internal/util/radosmutex/lock/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package lock

import (
"bytes"
"encoding/binary"
"fmt"
"time"

"github.com/ceph/ceph-csi/internal/util/radosmutex/lockstate"
)

// Lock represents a lock with an owner, state, and expiry time.
type Lock struct {
LockOwner string
LockState lockstate.LockState
LockExpiry time.Time
}

const (
// LockOwnerMaxSize defines the maximum size of the lock owner string in bytes.
LockOwnerMaxSize = 256
// TimeStampSize defines the size of the timestamp in bytes.
TimeStampSize = 8
)

// ToBytes serializes the Lock structure into a byte slice.
func (l *Lock) ToBytes() ([]byte, error) {
ownerBytes := []byte(l.LockOwner)
if len(ownerBytes) > LockOwnerMaxSize {
return nil, fmt.Errorf("lock owner exceeds max size of %d bytes", LockOwnerMaxSize)
}

buffer := new(bytes.Buffer)

// Write the length of the lock owner string
if err := binary.Write(buffer, binary.LittleEndian, int16(len(ownerBytes))); err != nil {
return nil, err
}

// Write the lock owner string
if _, err := buffer.Write(ownerBytes); err != nil {
return nil, err
}

// Write the lock state
if _, err := buffer.Write(lockstate.ToBytes(l.LockState)); err != nil {
return nil, err
}

// Write the lock expiry timestamp
expiryBytes := make([]byte, TimeStampSize)
binary.LittleEndian.PutUint64(expiryBytes, uint64(l.LockExpiry.Unix()))
if _, err := buffer.Write(expiryBytes); err != nil {
return nil, err
}

return buffer.Bytes(), nil
}

// FromBytes deserializes the byte slice into a Lock structure.
func (l *Lock) FromBytes(data []byte) error {
buffer := bytes.NewReader(data)

// Read the length of the lock owner string
var ownerLength int16
if err := binary.Read(buffer, binary.LittleEndian, &ownerLength); err != nil {
return err
}

// Read the lock owner string
ownerBytes := make([]byte, ownerLength)
if _, err := buffer.Read(ownerBytes); err != nil {
return err
}
l.LockOwner = string(ownerBytes)

// Read the lock state
stateBytes := make([]byte, lockstate.LockStateSize)
if _, err := buffer.Read(stateBytes); err != nil {
return err
}
lockState, err := lockstate.FromBytes(stateBytes)
if err != nil {
return err
}
l.LockState = lockState

// Read the lock expiry timestamp
expiryBytes := make([]byte, TimeStampSize)
if _, err := buffer.Read(expiryBytes); err != nil {
return err
}
expiryUnix := int64(binary.LittleEndian.Uint64(expiryBytes))
l.LockExpiry = time.Unix(expiryUnix, 0)

return nil
}
96 changes: 96 additions & 0 deletions internal/util/radosmutex/lock/lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package lock

import (
"strings"
"testing"
"time"

"github.com/ceph/ceph-csi/internal/util/radosmutex/lockstate"
)

func TestLockSerializationDeserialization(t *testing.T) {
// Setup
lockInstance := Lock{
LockOwner: "testUser",
LockState: lockstate.Unlocked,
LockExpiry: time.Now().Add(10 * time.Minute),
}

// Test Successful Serialization and Deserialization
serialized, err := lockInstance.ToBytes()
if err != nil {
t.Fatalf("Failed to serialize lock: %v", err)
}

deserializedLock := Lock{}
err = deserializedLock.FromBytes(serialized)
if err != nil {
t.Fatalf("Failed to deserialize lock: %v", err)
}

t.Logf("Original LockOwner: %s", lockInstance.LockOwner)
t.Logf("Deserialized LockOwner: %s", deserializedLock.LockOwner)
t.Logf("Original LockState: %v", lockInstance.LockState)
t.Logf("Deserialized LockState: %v", deserializedLock.LockState)
t.Logf("Original LockExpiry: %v", lockInstance.LockExpiry)
t.Logf("Deserialized LockExpiry: %v", deserializedLock.LockExpiry)

// Check if the original and deserialized locks match

if lockInstance.LockOwner != deserializedLock.LockOwner {
t.Errorf("Serialized and deserialized lock instances do not match. LockOwner: Expected '%s', Got '%s'", lockInstance.LockOwner, deserializedLock.LockOwner)
}

if lockInstance.LockState != deserializedLock.LockState {
t.Errorf("Serialized and deserialized lock instances do not match. LockState: Expected '%v', Got '%v'", lockInstance.LockState, deserializedLock.LockState)
}

if lockInstance.LockExpiry.Unix() != deserializedLock.LockExpiry.Unix() {
t.Errorf("Serialized and deserialized lock instances do not match. LockExpiry: Expected '%v', Got '%v'", lockInstance.LockExpiry, deserializedLock.LockExpiry)
}
}

func TestLockSerializationWithLongOwnerFails(t *testing.T) {
lockInstance := Lock{
LockOwner: strings.Repeat("x", 300), // Creates a string longer than LockOwnerMaxSize
LockState: lockstate.Unlocked,
LockExpiry: time.Now().Add(10 * time.Minute),
}

_, err := lockInstance.ToBytes()
if err == nil || err.Error() != "lock owner exceeds max size of 256 bytes" {
t.Errorf("Expected error due to long owner name, got %v", err)
}
}

func TestLockSerializationWithEmptyFields(t *testing.T) {
// Setup
lockInstance := Lock{
LockOwner: "",
LockState: lockstate.Unlocked,
LockExpiry: time.Time{},
}

serialized, err := lockInstance.ToBytes()
if err != nil {
t.Fatalf("Failed to serialize lock: %v", err)
}

deserializedLock := Lock{}
err = deserializedLock.FromBytes(serialized)
if err != nil {
t.Fatalf("Failed to deserialize lock: %v", err)
}

if lockInstance.LockOwner != deserializedLock.LockOwner {
t.Errorf("Serialized and deserialized lock instances do not match. LockOwner: Expected '%s', Got '%s'", lockInstance.LockOwner, deserializedLock.LockOwner)
}

if lockInstance.LockState != deserializedLock.LockState {
t.Errorf("Serialized and deserialized lock instances do not match. LockState: Expected '%v', Got '%v'", lockInstance.LockState, deserializedLock.LockState)
}

if lockInstance.LockExpiry.Unix() != deserializedLock.LockExpiry.Unix() {
t.Errorf("Serialized and deserialized lock instances do not match. LockExpiry: Expected '%v', Got '%v'", lockInstance.LockExpiry, deserializedLock.LockExpiry)
}
}
Loading
Loading