Skip to content

Commit

Permalink
DRA: don't query claims from API server
Browse files Browse the repository at this point in the history
When a pod is force-deleted UnprepareResources fails to get a claim
from an API server.
PrepareResources should cache claim info required by the
UnprepareResources so that UnprepareResources would get it from
the cache instead of querying API server.
  • Loading branch information
bart0sh committed Jul 18, 2023
1 parent b3fc4cf commit f6431c6
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 34 deletions.
17 changes: 10 additions & 7 deletions pkg/kubelet/cm/dra/claiminfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sync"

resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/dra/state"
Expand Down Expand Up @@ -80,14 +81,15 @@ type claimInfoCache struct {
claimInfo map[string]*ClaimInfo
}

func newClaimInfo(driverName, className string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string]) *ClaimInfo {
func newClaimInfo(driverName, className string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string], resourceHandles []resourcev1alpha2.ResourceHandle) *ClaimInfo {
claimInfoState := state.ClaimInfoState{
DriverName: driverName,
ClassName: className,
ClaimUID: claimUID,
ClaimName: claimName,
Namespace: namespace,
PodUIDs: podUIDs,
DriverName: driverName,
ClassName: className,
ClaimUID: claimUID,
ClaimName: claimName,
Namespace: namespace,
PodUIDs: podUIDs,
ResourceHandles: resourceHandles,
}
claimInfo := ClaimInfo{
ClaimInfoState: claimInfoState,
Expand Down Expand Up @@ -120,6 +122,7 @@ func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error)
entry.ClaimName,
entry.Namespace,
entry.PodUIDs,
entry.ResourceHandles,
)
for pluginName, cdiDevices := range entry.CDIDevices {
err := info.addCDIDevices(pluginName, cdiDevices)
Expand Down
29 changes: 6 additions & 23 deletions pkg/kubelet/cm/dra/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
resourceClaim.Name,
resourceClaim.Namespace,
sets.New(string(pod.UID)),
resourceHandles,
)

// Loop through all plugins and prepare for calling NodePrepareResources.
Expand Down Expand Up @@ -342,26 +343,8 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
continue
}

// Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
context.TODO(),
*claimName,
metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", *claimName, pod.Name, err)
}

// Grab the allocation.resourceHandles. If there are no
// allocation.resourceHandles, create a single resourceHandle with no
// content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}

// Loop through all plugins and prepare for calling NodeUnprepareResources.
for _, resourceHandle := range resourceHandles {
for _, resourceHandle := range claimInfo.ResourceHandles {
// If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status
pluginName := resourceHandle.DriverName
Expand All @@ -370,14 +353,14 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
}

claim := &drapb.Claim{
Namespace: resourceClaim.Namespace,
Uid: string(resourceClaim.UID),
Name: resourceClaim.Name,
Namespace: claimInfo.Namespace,
Uid: string(claimInfo.ClaimUID),
Name: claimInfo.ClaimName,
ResourceHandle: resourceHandle.Data,
}
batches[pluginName] = append(batches[pluginName], claim)
}
claimInfos[resourceClaim.UID] = claimInfo
claimInfos[claimInfo.ClaimUID] = claimInfo
}

// Call NodeUnprepareResources for all claims in each batch.
Expand Down
54 changes: 54 additions & 0 deletions pkg/kubelet/cm/dra/state/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ package state

import (
"encoding/json"
"fmt"
"hash/fnv"
"strings"

"k8s.io/apimachinery/pkg/util/dump"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)

var _ checkpointmanager.Checkpoint = &DRAManagerCheckpoint{}
Expand All @@ -34,9 +39,20 @@ type DRAManagerCheckpoint struct {
Checksum checksum.Checksum `json:"checksum"`
}

// DraManagerCheckpoint struct is an old implementation of the DraManagerCheckpoint
type DRAManagerCheckpointWithoutResourceHandles struct {
Version string `json:"version"`
Entries ClaimInfoStateListWithoutResourceHandles `json:"entries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
}

// List of claim info to store in checkpoint
type ClaimInfoStateList []ClaimInfoState

// List of claim info to store in checkpoint
// TODO: remove in Beta
type ClaimInfoStateListWithoutResourceHandles []ClaimInfoStateWithoutResourceHandles

// NewDRAManagerCheckpoint returns an instance of Checkpoint
func NewDRAManagerCheckpoint() *DRAManagerCheckpoint {
return &DRAManagerCheckpoint{
Expand All @@ -63,6 +79,44 @@ func (dc *DRAManagerCheckpoint) VerifyChecksum() error {
ck := dc.Checksum
dc.Checksum = 0
err := ck.Verify(dc)
if err == errors.ErrCorruptCheckpoint {
// Verify with old structs without ResourceHandles field
// TODO: remove in Beta
err = verifyChecksumWithoutResourceHandles(dc, ck)
}
dc.Checksum = ck
return err
}

// verifyChecksumWithoutResourceHandles is a helper function that verifies checksum of the
// checkpoint in the old format, without ResourceHandles field.
// TODO: remove in Beta.
func verifyChecksumWithoutResourceHandles(dc *DRAManagerCheckpoint, checkSum checksum.Checksum) error {
entries := ClaimInfoStateListWithoutResourceHandles{}
for _, entry := range dc.Entries {
entries = append(entries, ClaimInfoStateWithoutResourceHandles{
DriverName: entry.DriverName,
ClassName: entry.ClassName,
ClaimUID: entry.ClaimUID,
ClaimName: entry.ClaimName,
Namespace: entry.Namespace,
PodUIDs: entry.PodUIDs,
CDIDevices: entry.CDIDevices,
})
}
oldcheckpoint := &DRAManagerCheckpointWithoutResourceHandles{
Version: checkpointVersion,
Entries: entries,
Checksum: 0,
}
// Calculate checksum for old checkpoint
object := dump.ForHash(oldcheckpoint)
object = strings.Replace(object, "DRAManagerCheckpointWithoutResourceHandles", "DRAManagerCheckpoint", 1)
object = strings.Replace(object, "ClaimInfoStateListWithoutResourceHandles", "ClaimInfoStateList", 1)
hash := fnv.New32a()
fmt.Fprintf(hash, "%v", object)
if checkSum != checksum.Checksum(hash.Sum32()) {
return errors.ErrCorruptCheckpoint
}
return nil
}
30 changes: 30 additions & 0 deletions pkg/kubelet/cm/dra/state/state_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sync"

resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
Expand Down Expand Up @@ -54,6 +55,35 @@ type ClaimInfoState struct {
// PodUIDs is a set of pod UIDs that reference a resource
PodUIDs sets.Set[string]

// ResourceHandles is a list of opaque resource data for processing by a specific kubelet plugin
ResourceHandles []resourcev1alpha2.ResourceHandle

// CDIDevices is a map of DriverName --> CDI devices returned by the
// GRPC API call NodePrepareResource
CDIDevices map[string][]string
}

// ClaimInfoStateWithoutResourceHandles is an old implementation of the ClaimInfoState
// TODO: remove in Beta
type ClaimInfoStateWithoutResourceHandles struct {
// Name of the DRA driver
DriverName string

// ClassName is a resource class of the claim
ClassName string

// ClaimUID is an UID of the resource claim
ClaimUID types.UID

// ClaimName is a name of the resource claim
ClaimName string

// Namespace is a claim namespace
Namespace string

// PodUIDs is a set of pod UIDs that reference a resource
PodUIDs sets.Set[string]

// CDIDevices is a map of DriverName --> CDI devices returned by the
// GRPC API call NodePrepareResource
CDIDevices map[string][]string
Expand Down
65 changes: 61 additions & 4 deletions pkg/kubelet/cm/dra/state/state_checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package state

import (
"os"
"path"
"strings"
"testing"

"github.com/stretchr/testify/assert"

resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing"
Expand Down Expand Up @@ -50,7 +52,7 @@ func TestCheckpointGetOrCreate(t *testing.T) {
},
{
"Restore checkpoint - single claim",
`{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":153446146}`,
`{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":[{"driverName":"test-driver.cdi.k8s.io","data":"{\"a\": \"b\"}"}],"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":4194867564}`,
"",
[]ClaimInfoState{
{
Expand All @@ -60,6 +62,12 @@ func TestCheckpointGetOrCreate(t *testing.T) {
ClaimName: "example",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
ResourceHandles: []resourcev1alpha2.ResourceHandle{
{
DriverName: "test-driver.cdi.k8s.io",
Data: `{"a": "b"}`,
},
},
CDIDevices: map[string][]string{
"test-driver.cdi.k8s.io": {"example.com/example=cdi-example"},
},
Expand All @@ -68,7 +76,7 @@ func TestCheckpointGetOrCreate(t *testing.T) {
},
{
"Restore checkpoint - single claim - multiple devices",
`{"version":"v1","entries":[{"DriverName":"meta-test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver-1.cdi.k8s.io":["example-1.com/example-1=cdi-example-1"],"test-driver-2.cdi.k8s.io":["example-2.com/example-2=cdi-example-2"]}}],"checksum":1363630443}`,
`{"version":"v1","entries":[{"DriverName":"meta-test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":[{"driverName":"test-driver-1.cdi.k8s.io","data":"{\"a\": \"b\"}"},{"driverName":"test-driver-2.cdi.k8s.io","data":"{\"c\": \"d\"}"}],"CDIDevices":{"test-driver-1.cdi.k8s.io":["example-1.com/example-1=cdi-example-1"],"test-driver-2.cdi.k8s.io":["example-2.com/example-2=cdi-example-2"]}}],"checksum":360176657}`,
"",
[]ClaimInfoState{
{
Expand All @@ -78,6 +86,16 @@ func TestCheckpointGetOrCreate(t *testing.T) {
ClaimName: "example",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
ResourceHandles: []resourcev1alpha2.ResourceHandle{
{
DriverName: "test-driver-1.cdi.k8s.io",
Data: `{"a": "b"}`,
},
{
DriverName: "test-driver-2.cdi.k8s.io",
Data: `{"c": "d"}`,
},
},
CDIDevices: map[string][]string{
"test-driver-1.cdi.k8s.io": {"example-1.com/example-1=cdi-example-1"},
"test-driver-2.cdi.k8s.io": {"example-2.com/example-2=cdi-example-2"},
Expand All @@ -87,7 +105,7 @@ func TestCheckpointGetOrCreate(t *testing.T) {
},
{
"Restore checkpoint - multiple claims",
`{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name-1","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example-1","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example-1"]}},{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c","ClassName":"class-name-2","ClaimName":"example-2","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example-2"]}}],"checksum":1978566460}`,
`{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name-1","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example-1","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":[{"driverName":"test-driver.cdi.k8s.io","data":"{\"a\": \"b\"}"}],"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example-1"]}},{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name-2","ClaimUID":"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c","ClaimName":"example-2","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":[{"driverName":"test-driver.cdi.k8s.io","data":"{\"c\": \"d\"}"}],"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example-2"]}}],"checksum":103176902}`,
"",
[]ClaimInfoState{
{
Expand All @@ -97,6 +115,12 @@ func TestCheckpointGetOrCreate(t *testing.T) {
ClaimName: "example-1",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
ResourceHandles: []resourcev1alpha2.ResourceHandle{
{
DriverName: "test-driver.cdi.k8s.io",
Data: `{"a": "b"}`,
},
},
CDIDevices: map[string][]string{
"test-driver.cdi.k8s.io": {"example.com/example=cdi-example-1"},
},
Expand All @@ -108,6 +132,12 @@ func TestCheckpointGetOrCreate(t *testing.T) {
ClaimName: "example-2",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
ResourceHandles: []resourcev1alpha2.ResourceHandle{
{
DriverName: "test-driver.cdi.k8s.io",
Data: `{"c": "d"}`,
},
},
CDIDevices: map[string][]string{
"test-driver.cdi.k8s.io": {"example.com/example=cdi-example-2"},
},
Expand Down Expand Up @@ -153,6 +183,7 @@ func TestCheckpointGetOrCreate(t *testing.T) {
var state ClaimInfoStateList

checkpointState, err := NewCheckpointState(testingDir, testingCheckpoint)

if err == nil {
state, err = checkpointState.GetOrCreate()
}
Expand All @@ -176,12 +207,18 @@ func TestCheckpointStateStore(t *testing.T) {
ClaimName: "example",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
ResourceHandles: []resourcev1alpha2.ResourceHandle{
{
DriverName: "test-driver.cdi.k8s.io",
Data: `{"a": "b"}`,
},
},
CDIDevices: map[string][]string{
"test-driver.cdi.k8s.io": {"example.com/example=cdi-example"},
},
}

expectedCheckpoint := `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":153446146}`
expectedCheckpoint := `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":[{"driverName":"test-driver.cdi.k8s.io","data":"{\"a\": \"b\"}"}],"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":4194867564}`

// Should return an error, stateDir cannot be an empty string
if _, err := NewCheckpointState("", testingCheckpoint); err == nil {
Expand Down Expand Up @@ -214,3 +251,23 @@ func TestCheckpointStateStore(t *testing.T) {
t.Fatal("expected error but got nil")
}
}

func TestOldCheckpointRestore(t *testing.T) {
testingDir := t.TempDir()
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
assert.NoError(t, err, "could not create testing checkpoint manager")

oldCheckpointData := `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":153446146}`
err = os.WriteFile(path.Join(testingDir, testingCheckpoint), []byte(oldCheckpointData), 0644)
assert.NoError(t, err, "could not store checkpoint data")

checkpoint := NewDRAManagerCheckpoint()
err = cpm.GetCheckpoint(testingCheckpoint, checkpoint)
assert.NoError(t, err, "could not restore checkpoint")

checkpointData, err := checkpoint.MarshalCheckpoint()
assert.NoError(t, err, "could not Marshal Checkpoint")

expectedData := `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":null,"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":453625682}`
assert.Equal(t, expectedData, string(checkpointData), "expected ClaimInfoState does not equal to restored one")
}

0 comments on commit f6431c6

Please sign in to comment.