Skip to content

Commit

Permalink
Merge pull request #66 from cwdsuzhou/tensile-kube
Browse files Browse the repository at this point in the history
Change some funcs
  • Loading branch information
cwdsuzhou committed Dec 29, 2020
2 parents df3a580 + e66d06d commit 5ed39d0
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 172 deletions.
47 changes: 6 additions & 41 deletions cmd/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"golang.org/x/time/rate"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

Expand Down Expand Up @@ -89,8 +88,7 @@ func main() {
cfg.ConfigPath = o.KubeConfigPath
provider, err := k8sprovider.NewVirtualK8S(cfg, &cc, ignoreLabels, enableServiceAccount, o)
if err == nil {
go RunController(ctx, provider.GetMaster(),
provider.GetClient(), provider.GetNameSpaceLister(), cfg.NodeName, numberOfWorkers)
go RunController(ctx, provider, cfg.NodeName, numberOfWorkers)
}
return provider, err
}),
Expand All @@ -114,10 +112,10 @@ func main() {
}

// RunController starts controllers for objects needed to be synced
func RunController(ctx context.Context, master,
client kubernetes.Interface, nsLister corelisters.NamespaceLister, hostIP string,
func RunController(ctx context.Context, p *k8sprovider.VirtualK8S, hostIP string,
workers int) *controllers.ServiceController {

master := p.GetMaster()
client := p.GetClient()
masterInformer := kubeinformers.NewSharedInformerFactory(master, 0)
if masterInformer == nil {
return nil
Expand All @@ -136,10 +134,10 @@ func RunController(ctx context.Context, master,
}
switch c {
case "PVControllers":
pvCtrl := buildPVController(master, client, masterInformer, clientInformer, hostIP)
pvCtrl := controllers.NewPVController(master, client, masterInformer, clientInformer, hostIP)
runningControllers = append(runningControllers, pvCtrl)
case "ServiceControllers":
serviceCtrl := buildServiceController(master, client, masterInformer, clientInformer, nsLister)
serviceCtrl := controllers.NewServiceController(master, client, masterInformer, clientInformer, p.GetNameSpaceLister())
runningControllers = append(runningControllers, serviceCtrl)
default:
klog.Warningf("Skip: %v", c)
Expand All @@ -154,39 +152,6 @@ func RunController(ctx context.Context, master,
return nil
}

func buildServiceController(master, client kubernetes.Interface, masterInformer,
clientInformer kubeinformers.SharedInformerFactory,
nsLister corelisters.NamespaceLister) controllers.Controller {
// master
serviceInformer := masterInformer.Core().V1().Services()
endpointsInformer := masterInformer.Core().V1().Endpoints()
// client
clientServiceInformer := clientInformer.Core().V1().Services()
clientEndpointsInformer := clientInformer.Core().V1().Endpoints()

serviceRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second)
endpointsRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second)
return controllers.NewServiceController(master, client, serviceInformer, endpointsInformer,
clientServiceInformer, clientEndpointsInformer, nsLister, serviceRateLimiter, endpointsRateLimiter)
}

func buildPVController(master, client kubernetes.Interface, masterInformer,
clientInformer kubeinformers.SharedInformerFactory, hostIP string) controllers.Controller {

pvcInformer := masterInformer.Core().V1().PersistentVolumeClaims()
pvInformer := masterInformer.Core().V1().PersistentVolumes()

clientPVCInformer := clientInformer.Core().V1().PersistentVolumeClaims()
clientPVInformer := clientInformer.Core().V1().PersistentVolumes()

pvcRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second)
pvRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second)

return controllers.NewPVController(master, client, pvcInformer, pvInformer,
clientPVCInformer,
clientPVInformer, pvcRateLimiter, pvRateLimiter, hostIP)
}

func buildCommonControllers(client kubernetes.Interface, masterInformer,
clientInformer kubeinformers.SharedInformerFactory) controllers.Controller {

Expand Down
14 changes: 10 additions & 4 deletions pkg/controllers/pv_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
mergetypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -67,13 +67,19 @@ type PVController struct {

// NewPVController returns a new *PVController
func NewPVController(master kubernetes.Interface, client kubernetes.Interface,
pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer,
clientPVCInformer coreinformers.PersistentVolumeClaimInformer, clientPVInformer coreinformers.PersistentVolumeInformer,
pvcRateLimiter, pvRateLimiter workqueue.RateLimiter, hostIP string) Controller {
masterInformer, clientInformer informers.SharedInformerFactory, hostIP string) Controller {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: master.CoreV1().Events(v1.NamespaceAll)})
var eventRecorder record.EventRecorder
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "virtual-kubelet"})
pvcInformer := masterInformer.Core().V1().PersistentVolumeClaims()
pvInformer := masterInformer.Core().V1().PersistentVolumes()

clientPVCInformer := clientInformer.Core().V1().PersistentVolumeClaims()
clientPVInformer := clientInformer.Core().V1().PersistentVolumes()

pvcRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second)
pvRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second)
ctrl := &PVController{
master: master,
client: client,
Expand Down
16 changes: 11 additions & 5 deletions pkg/controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
mergetypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -62,14 +62,20 @@ type ServiceController struct {

// NewServiceController returns a new *ServiceController
func NewServiceController(master kubernetes.Interface, client kubernetes.Interface,
serviceInformer coreinformers.ServiceInformer, endpointsInformer coreinformers.EndpointsInformer,
clientServiceInformer coreinformers.ServiceInformer, clientEndpointsInformer coreinformers.EndpointsInformer,
nsLister corelisters.NamespaceLister,
serviceRateLimiter, endpointsRateLimiter workqueue.RateLimiter) Controller {
masterInformer, clientInformer informers.SharedInformerFactory,
nsLister corelisters.NamespaceLister) Controller {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: master.CoreV1().Events(v1.NamespaceAll)})
var eventRecorder record.EventRecorder
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "virtual-kubelet"})
serviceRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second)
endpointsRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second)
// master
serviceInformer := masterInformer.Core().V1().Services()
endpointsInformer := masterInformer.Core().V1().Endpoints()
// client
clientServiceInformer := clientInformer.Core().V1().Services()
clientEndpointsInformer := clientInformer.Core().V1().Endpoints()
ctrl := &ServiceController{
master: master,
client: client,
Expand Down
14 changes: 1 addition & 13 deletions pkg/controllers/service_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
)

Expand Down Expand Up @@ -424,19 +423,8 @@ func newServiceController() *svcTestBase {
clientInformer := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
masterInformer := informers.NewSharedInformerFactory(master, controller.NoResyncPeriodFunc())

serviceInformer := masterInformer.Core().V1().Services()
endPointsInformer := masterInformer.Core().V1().Endpoints()

nsLister := masterInformer.Core().V1().Namespaces().Lister()

clientServiceInformer := clientInformer.Core().V1().Services()
clientEndPointsInformer := clientInformer.Core().V1().Endpoints()

serviceRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second)
endPointsRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second)

controller := NewServiceController(master, client, serviceInformer, endPointsInformer, clientServiceInformer,
clientEndPointsInformer, nsLister, serviceRateLimiter, endPointsRateLimiter)
controller := NewServiceController(master, client, masterInformer, clientInformer, nsLister)
c := controller.(*ServiceController)
return &svcTestBase{
c: c,
Expand Down
1 change: 0 additions & 1 deletion pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
podEvictor := evictions.NewPodEvictor(
rs.Client,
evictionPolicyGroupVersion,
rs.DryRun,
rs.MaxNoOfPodsToEvictPerNode,
nodes, unschedulableCache,
)
Expand Down
3 changes: 1 addition & 2 deletions pkg/descheduler/evictions/evictions.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ type PodEvictor struct {
func NewPodEvictor(
client clientset.Interface,
policyGroupVersion string,
dryRun bool,
maxPodsToEvict int,
nodes []*v1.Node, unschedulableCache *util.UnschedulableCache) *PodEvictor {
var nodePodCount = make(nodePodEvictedCount)
Expand All @@ -90,7 +89,7 @@ func NewPodEvictor(
return &PodEvictor{
client: client,
policyGroupVersion: policyGroupVersion,
dryRun: dryRun,
dryRun: false,
maxPodsToEvict: maxPodsToEvict,
nodepodCount: nodePodCount,
nodeNum: virtualCount,
Expand Down
Loading

0 comments on commit 5ed39d0

Please sign in to comment.