Skip to content

Commit

Permalink
Support CRD for stage
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Jul 3, 2023
1 parent c0e6786 commit d7ee2e1
Show file tree
Hide file tree
Showing 21 changed files with 330 additions and 151 deletions.
35 changes: 8 additions & 27 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ jobs:
BUILDER=${{ matrix.builder }} make build-cluster-image
test-kwok:
strategy:
fail-fast: false
matrix:
case:
- kwok
- kwok-with-cni
continue-on-error: false
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand All @@ -92,33 +99,7 @@ jobs:
- name: Test Workable
shell: bash
run: |
./hack/e2e-test.sh kwok/kwok
test-kwok-with-cni:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.20"

- name: Install Kind
shell: bash
run: |
./hack/requirements.sh kind
kind version
- name: Install kubectl
shell: bash
run: |
./hack/requirements.sh kubectl
kubectl version || :
- name: Test Workable
shell: bash
run: |
./hack/e2e-test.sh kwok-with-cni/kwok-with-cni
./hack/e2e-test.sh ${{ matrix.case }}/${{ matrix.case }}
test-kwokctl:
# https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs
Expand Down
1 change: 1 addition & 0 deletions kustomize/kwok-with-cni/deployment-patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ spec:
- --node-ip=$(POD_IP)
- --node-port=10247
- --node-lease-duration-seconds=40
- --enable-crd=Stage,Attach,Exec,PortForward,Logs,ClusterAttach,ClusterExec,ClusterPortForward,ClusterLogs
- --experimental-enable-cni=true
volumeMounts:
- name: etc-cni
Expand Down
1 change: 1 addition & 0 deletions kustomize/kwok/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ spec:
- --node-port=10247
- --cidr=10.0.0.1/24
- --node-lease-duration-seconds=40
- --enable-crd=Stage,Attach,Exec,PortForward,Logs,ClusterAttach,ClusterExec,ClusterPortForward,ClusterLogs
env:
- name: POD_IP
valueFrom:
Expand Down
61 changes: 61 additions & 0 deletions pkg/config/resources/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
"sync"
)

type cacheGetter[O any] struct {
getter Getter[O]

currentVer string
data O

mut sync.RWMutex
}

func withCache[O any](getter Getter[O]) Getter[O] {
return &cacheGetter[O]{getter: getter}
}

func (g *cacheGetter[O]) Get() O {
g.mut.RLock()
latestVer := g.getter.Version()
if g.currentVer == latestVer {
data := g.data
g.mut.RUnlock()
return data
}
g.mut.RUnlock()

g.mut.Lock()
defer g.mut.Unlock()
if g.currentVer == latestVer {
data := g.data
return data
}

data := g.getter.Get()
g.data = data
g.currentVer = latestVer
return data
}

func (g *cacheGetter[O]) Version() string {
return g.getter.Version()
}
41 changes: 13 additions & 28 deletions pkg/config/resources/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package resources

import (
"context"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -39,10 +38,18 @@ type ConvertFunc[O any, T runtime.Object, S ~[]T] func(objs S) O

// NewDynamicGetter returns a new Getter that returns the latest list of resources.
func NewDynamicGetter[O any, T runtime.Object, L runtime.Object](syncer Syncer[T, L], convertFunc ConvertFunc[O, T, []T]) DynamicGetter[O] {
return &dynamicGetter[O, T, L]{
getter := &dynamicGetter[O, T, L]{
syncer: syncer,
convertFunc: convertFunc,
}

return struct {
Getter[O]
Starter
}{
Getter: withCache[O](getter),
Starter: getter,
}
}

type dynamicGetter[O any, T runtime.Object, L runtime.Object] struct {
Expand All @@ -52,11 +59,6 @@ type dynamicGetter[O any, T runtime.Object, L runtime.Object] struct {

store cache.Store
controller cache.Controller

currentVer string
data O

mut sync.RWMutex
}

func (c *dynamicGetter[O, T, L]) Start(ctx context.Context) error {
Expand All @@ -82,33 +84,16 @@ func (c *dynamicGetter[O, T, L]) Start(ctx context.Context) error {
}

func (c *dynamicGetter[O, T, L]) Get() O {
latestVer := c.controller.LastSyncResourceVersion()

c.mut.RLock()
if latestVer == c.currentVer {
data := c.data
c.mut.RUnlock()
return data
}
c.mut.RUnlock()
return c.updateAndReturn(latestVer)
}

func (c *dynamicGetter[O, T, L]) updateAndReturn(latestVer string) O {
c.mut.Lock()
defer c.mut.Unlock()
if latestVer == c.currentVer {
return c.data
}

list := c.store.List()
currentList := make([]T, 0, len(list))
for _, obj := range list {
currentList = append(currentList, obj.(T))
}

data := c.convertFunc(currentList)
c.data = data
c.currentVer = latestVer
return data
}

func (c *dynamicGetter[O, T, L]) Version() string {
return c.controller.LastSyncResourceVersion()
}
35 changes: 35 additions & 0 deletions pkg/config/resources/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

type filterGetter[O any, T any] struct {
getter Getter[T]
filterFunc func(T) O
}

// NewFilter returns a new Getter that returns the given list.
func NewFilter[O any, T any](getter Getter[T], filterFunc func(T) O) Getter[O] {
return withCache[O](&filterGetter[O, T]{getter: getter, filterFunc: filterFunc})
}

func (f *filterGetter[O, T]) Get() O {
return f.filterFunc(f.getter.Get())
}

func (f *filterGetter[O, T]) Version() string {
return f.getter.Version()
}
5 changes: 4 additions & 1 deletion pkg/config/resources/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ limitations under the License.

package resources

import "context"
import (
"context"
)

// Getter is an interface for getting resources.
type Getter[O any] interface {
Get() O
Version() string
}

// DynamicGetter is an interface for getting resources.
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/resources/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ func NewStaticGetter[T any](data T) Getter[T] {
func (s *staticGetter[T]) Get() T {
return s.data
}

func (s *staticGetter[T]) Version() string {
return ""
}
39 changes: 25 additions & 14 deletions pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/utils/clock"

"sigs.k8s.io/kwok/pkg/apis/internalversion"
"sigs.k8s.io/kwok/pkg/apis/v1alpha1"
"sigs.k8s.io/kwok/pkg/config"
"sigs.k8s.io/kwok/pkg/kwok/controllers"
"sigs.k8s.io/kwok/pkg/kwok/server"
Expand Down Expand Up @@ -84,6 +85,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
cmd.Flags().StringVar(&flags.Master, "master", flags.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig).")
cmd.Flags().StringVar(&flags.Options.ServerAddress, "server-address", flags.Options.ServerAddress, "Address to expose the server on")
cmd.Flags().UintVar(&flags.Options.NodeLeaseDurationSeconds, "node-lease-duration-seconds", flags.Options.NodeLeaseDurationSeconds, "Duration of node lease seconds")
cmd.Flags().StringArrayVar(&flags.Options.EnableCRDs, "enable-crd", flags.Options.EnableCRDs, "List of CRDs to enable")

cmd.Flags().BoolVar(&flags.Options.EnableCNI, "experimental-enable-cni", flags.Options.EnableCNI, "Experimental support for getting pod ip from CNI, for CNI-related components, Only works with Linux")
if config.GOOS != "linux" {
Expand Down Expand Up @@ -145,26 +147,34 @@ func runE(ctx context.Context, flags *flagpole) error {
}

stagesData := config.FilterWithTypeFromContext[*internalversion.Stage](ctx)
var nodeStages []*internalversion.Stage
var podStages []*internalversion.Stage

nodeStages := filterStages(stagesData, "v1", "Node")
if len(nodeStages) == 0 {
nodeStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages))
if err != nil {
return err
if slices.Contains(flags.Options.EnableCRDs, v1alpha1.StageKind) {
if len(stagesData) != 0 {
return fmt.Errorf("stage already exists, cannot watch CRD")
}
if flags.Options.NodeLeaseDurationSeconds == 0 {
nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages))
} else {
nodeStages = filterStages(stagesData, "v1", "Node")
if len(nodeStages) == 0 {
nodeStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages))
if err != nil {
return err
}
nodeStages = append(nodeStages, nodeHeartbeatStages...)
if flags.Options.NodeLeaseDurationSeconds == 0 {
nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages))
if err != nil {
return err
}
nodeStages = append(nodeStages, nodeHeartbeatStages...)
}
}
}
podStages := filterStages(stagesData, "v1", "Pod")
if len(podStages) == 0 {
podStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultPodStages))
if err != nil {
return err
podStages = filterStages(stagesData, "v1", "Pod")
if len(podStages) == 0 {
podStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultPodStages))
if err != nil {
return err
}
}
}

Expand All @@ -179,6 +189,7 @@ func runE(ctx context.Context, flags *flagpole) error {
ctr, err := controllers.NewController(controllers.Config{
Clock: clock.RealClock{},
TypedClient: typedClient,
TypedKwokClient: typedKwokClient,
EnableCNI: flags.Options.EnableCNI,
ManageAllNodes: flags.Options.ManageAllNodes,
ManageNodesWithAnnotationSelector: flags.Options.ManageNodesWithAnnotationSelector,
Expand Down
Loading

0 comments on commit d7ee2e1

Please sign in to comment.