Skip to content

Commit

Permalink
[v1alpha2] Add PDB of TFReplicaSet for gang scheduling by kube-arbitr…
Browse files Browse the repository at this point in the history
…ator (#717)

Signed-off-by: Pengyu Chen <chenpengyu@caicloud.io>
  • Loading branch information
codeflitting authored and k8s-ci-robot committed Jul 9, 2018
1 parent c3214b2 commit 7ebe995
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 16 deletions.
12 changes: 7 additions & 5 deletions cmd/tf-operator.v2/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (

// ServerOption is the main context object for the controller manager.
type ServerOption struct {
Kubeconfig string
MasterURL string
Threadiness int
PrintVersion bool
JSONLogFormat bool
Kubeconfig string
MasterURL string
Threadiness int
PrintVersion bool
JSONLogFormat bool
EnableGangScheduling bool
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -46,4 +47,5 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) {

fs.BoolVar(&s.JSONLogFormat, "json-log-format", false,
"Set true to use json style log format. Set false to use plaintext style log format")
fs.BoolVar(&s.EnableGangScheduling, "enable-gang-scheduling", false, "Set true to enable gang scheduling by kube-arbitrator.")
}
2 changes: 1 addition & 1 deletion cmd/tf-operator.v2/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func Run(opt *options.ServerOption) error {
unstructuredInformer := controller.NewUnstructuredTFJobInformer(kcfg)

// Create tf controller.
tc := controller.NewTFJobController(unstructuredInformer, kubeClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory)
tc := controller.NewTFJobController(unstructuredInformer, kubeClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, *opt)

// Start informer goroutines.
go kubeInformerFactory.Start(stopCh)
Expand Down
67 changes: 66 additions & 1 deletion pkg/controller.v2/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
package controller

import (
"errors"
"fmt"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
Expand All @@ -34,13 +38,15 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"

"github.com/kubeflow/tf-operator/cmd/tf-operator.v2/app/options"
tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
tfjobscheme "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned/scheme"
tfjobinformers "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions"
tfjobinformersv1alpha2 "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions/kubeflow/v1alpha2"
tfjoblisters "github.com/kubeflow/tf-operator/pkg/client/listers/kubeflow/v1alpha2"
"github.com/kubeflow/tf-operator/pkg/control"
"github.com/kubeflow/tf-operator/pkg/generator"
)

const (
Expand All @@ -63,6 +69,7 @@ var (
// DefaultTFJobControllerConfiguration is the suggested tf-operator configuration for production.
DefaultTFJobControllerConfiguration = TFJobControllerConfiguration{
ReconcilerSyncLoopPeriod: metav1.Duration{Duration: 15 * time.Second},
enableGangScheduling: false,
}
)

Expand All @@ -76,6 +83,9 @@ type TFJobControllerConfiguration struct {
// and up to 5 minutes to reduce idle loop.
// e.g. 15s, 30s, 60s, 120s...
ReconcilerSyncLoopPeriod metav1.Duration

// Enable gang scheduling by kube-arbitrator
enableGangScheduling bool
}

// TFJobController is the type for TFJob Controller, which manages
Expand Down Expand Up @@ -162,7 +172,8 @@ func NewTFJobController(
kubeInformerFactory kubeinformers.SharedInformerFactory,
// This field is not used now but we keep it since it will be used
// after we support CRD validation.
tfJobInformerFactory tfjobinformers.SharedInformerFactory) *TFJobController {
tfJobInformerFactory tfjobinformers.SharedInformerFactory,
option options.ServerOption) *TFJobController {

tfjobscheme.AddToScheme(scheme.Scheme)

Expand All @@ -184,6 +195,10 @@ func NewTFJobController(

// Create new TFJobController.
tc := &TFJobController{
config: TFJobControllerConfiguration{
ReconcilerSyncLoopPeriod: metav1.Duration{Duration: 15 * time.Second},
enableGangScheduling: option.EnableGangScheduling,
},
podControl: realPodControl,
serviceControl: realServiceControl,
kubeClientSet: kubeClientSet,
Expand Down Expand Up @@ -362,6 +377,13 @@ func (tc *TFJobController) syncTFJob(key string) (bool, error) {
tfjob := sharedTFJob.DeepCopy()
tfjobNeedsSync := tc.satisfiedExpectations(tfjob)

if tc.config.enableGangScheduling {
_, err := tc.syncPdb(tfjob)
if err != nil {
log.Warnf("Sync pdb %v: %v", tfjob.Name, err)
}
}

// Set default for the new tfjob.
scheme.Scheme.Default(tfjob)

Expand All @@ -377,6 +399,49 @@ func (tc *TFJobController) syncTFJob(key string) (bool, error) {
return true, err
}

// SyncPdb will create a PDB for gang scheduling by kube-arbitrator.
func (tc *TFJobController) syncPdb(tfjob *tfv1alpha2.TFJob) (*v1beta1.PodDisruptionBudget, error) {
// Sum of tfjob's replicas
tfjobReplicas := int32(0)
for _, r := range tfjob.Spec.TFReplicaSpecs {
tfjobReplicas += *r.Replicas
}

// Non-distributed training is not required gang scheduling
if tfjobReplicas < 2 {
return nil, nil
}

// Check the pdb exist or not
pdb, err := tc.kubeClientSet.PolicyV1beta1().PodDisruptionBudgets(tfjob.Namespace).Get(tfjob.Name, metav1.GetOptions{})
if err == nil || !k8serrors.IsNotFound(err) {
if err == nil {
err = errors.New(string(metav1.StatusReasonAlreadyExists))
}
return pdb, err
}

// Create pdb for gang scheduling by kube-arbitrator
minAvailable := intstr.FromInt(int(tfjobReplicas))
createPdb := &v1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: tfjob.Name,
OwnerReferences: []metav1.OwnerReference{
*generator.GenOwnerReference(tfjob),
},
},
Spec: v1beta1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"tf_job_name": tfjob.Name,
},
},
},
}
return tc.kubeClientSet.PolicyV1beta1().PodDisruptionBudgets(tfjob.Namespace).Create(createPdb)
}

// reconcileTFJobs checks and updates replicas for each given TFReplicaSpec.
// It will requeue the tfjob in case of an error while creating/deleting pods/services.
func (tc *TFJobController) reconcileTFJobs(tfjob *tfv1alpha2.TFJob) error {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller.v2/controller_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/controller"

"github.com/kubeflow/tf-operator/cmd/tf-operator.v2/app/options"
tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
"github.com/kubeflow/tf-operator/pkg/generator"
Expand All @@ -45,7 +46,7 @@ func TestAddPod(t *testing.T) {
},
}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc)
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{})
ctr.tfJobInformerSynced = testutil.AlwaysReady
ctr.podInformerSynced = testutil.AlwaysReady
ctr.serviceInformerSynced = testutil.AlwaysReady
Expand Down Expand Up @@ -202,7 +203,7 @@ func TestExitCode(t *testing.T) {
},
}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
ctr, kubeInformerFactory, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc)
ctr, kubeInformerFactory, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{})
fakePodControl := &controller.FakePodControl{}
ctr.podControl = fakePodControl
ctr.tfJobInformerSynced = testutil.AlwaysReady
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller.v2/controller_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/controller"

"github.com/kubeflow/tf-operator/cmd/tf-operator.v2/app/options"
tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
"github.com/kubeflow/tf-operator/pkg/generator"
Expand All @@ -45,7 +46,7 @@ func TestAddService(t *testing.T) {
},
}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc)
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{})
ctr.tfJobInformerSynced = testutil.AlwaysReady
ctr.podInformerSynced = testutil.AlwaysReady
ctr.serviceInformerSynced = testutil.AlwaysReady
Expand Down
88 changes: 85 additions & 3 deletions pkg/controller.v2/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package controller

import (
"reflect"
"testing"
"time"

Expand All @@ -25,12 +26,18 @@ import (
"k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/controller"

"github.com/golang/protobuf/proto"
"github.com/kubeflow/tf-operator/cmd/tf-operator.v2/app/options"
tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
tfjobinformers "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions"
"github.com/kubeflow/tf-operator/pkg/control"
"github.com/kubeflow/tf-operator/pkg/generator"
"github.com/kubeflow/tf-operator/pkg/util/testutil"
"k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/fake"
)

var (
Expand All @@ -43,6 +50,7 @@ func newTFJobController(
kubeClientSet kubeclientset.Interface,
tfJobClientSet tfjobclientset.Interface,
resyncPeriod controller.ResyncPeriodFunc,
option options.ServerOption,
) (
*TFJobController,
kubeinformers.SharedInformerFactory, tfjobinformers.SharedInformerFactory,
Expand All @@ -52,7 +60,7 @@ func newTFJobController(

tfJobInformer := NewUnstructuredTFJobInformer(config)

ctr := NewTFJobController(tfJobInformer, kubeClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory)
ctr := NewTFJobController(tfJobInformer, kubeClientSet, tfJobClientSet, kubeInformerFactory, tfJobInformerFactory, option)
ctr.podControl = &controller.FakePodControl{}
ctr.serviceControl = &control.FakeServiceControl{}
return ctr, kubeInformerFactory, tfJobInformerFactory
Expand Down Expand Up @@ -213,8 +221,9 @@ func TestNormalPath(t *testing.T) {
GroupVersion: &tfv1alpha2.SchemeGroupVersion,
},
}
option := options.ServerOption{}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
ctr, kubeInformerFactory, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc)
ctr, kubeInformerFactory, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc, option)
ctr.tfJobInformerSynced = testutil.AlwaysReady
ctr.podInformerSynced = testutil.AlwaysReady
ctr.serviceInformerSynced = testutil.AlwaysReady
Expand Down Expand Up @@ -344,7 +353,7 @@ func TestRun(t *testing.T) {
},
}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc)
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{})
ctr.tfJobInformerSynced = testutil.AlwaysReady
ctr.podInformerSynced = testutil.AlwaysReady
ctr.serviceInformerSynced = testutil.AlwaysReady
Expand All @@ -362,3 +371,76 @@ func TestRun(t *testing.T) {
t.Errorf("Failed to run: %v", err)
}
}

func TestSyncPdb(t *testing.T) {
config := &rest.Config{
Host: "",
ContentConfig: rest.ContentConfig{
GroupVersion: &tfv1alpha2.SchemeGroupVersion,
},
}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
kubeClientSet := fake.NewSimpleClientset()
option := options.ServerOption{
EnableGangScheduling: true,
}
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc, option)

type testCase struct {
tfJob *tfv1alpha2.TFJob
expectPdb *v1beta1.PodDisruptionBudget
}

minAvailable2 := intstr.FromInt(2)
testCases := []testCase{
{
tfJob: &tfv1alpha2.TFJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sync-pdb",
},
Spec: tfv1alpha2.TFJobSpec{
TFReplicaSpecs: map[tfv1alpha2.TFReplicaType]*tfv1alpha2.TFReplicaSpec{
tfv1alpha2.TFReplicaTypeWorker: &tfv1alpha2.TFReplicaSpec{
Replicas: proto.Int32(1),
},
},
},
},
expectPdb: nil,
},
{
tfJob: &tfv1alpha2.TFJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sync-pdb",
},
Spec: tfv1alpha2.TFJobSpec{
TFReplicaSpecs: map[tfv1alpha2.TFReplicaType]*tfv1alpha2.TFReplicaSpec{
tfv1alpha2.TFReplicaTypeWorker: &tfv1alpha2.TFReplicaSpec{
Replicas: proto.Int32(2),
},
},
},
},
expectPdb: &v1beta1.PodDisruptionBudget{
Spec: v1beta1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable2,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"tf_job_name": "test-sync-pdb",
},
},
},
},
},
}
for _, c := range testCases {
pdb, _ := ctr.syncPdb(c.tfJob)
if pdb == nil && c.expectPdb != nil {
t.Errorf("Got nil, want %v", c.expectPdb.Spec)
}

if pdb != nil && !reflect.DeepEqual(c.expectPdb.Spec, pdb.Spec) {
t.Errorf("Got %+v, want %+v", pdb.Spec, c.expectPdb.Spec)
}
}
}
7 changes: 4 additions & 3 deletions pkg/controller.v2/controller_tfjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller"

"github.com/kubeflow/tf-operator/cmd/tf-operator.v2/app/options"
tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
"github.com/kubeflow/tf-operator/pkg/control"
Expand All @@ -46,7 +47,7 @@ func TestAddTFJob(t *testing.T) {
},
}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc)
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{})
ctr.tfJobInformerSynced = testutil.AlwaysReady
ctr.podInformerSynced = testutil.AlwaysReady
ctr.serviceInformerSynced = testutil.AlwaysReady
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestCopyLabelsAndAnnotation(t *testing.T) {
},
}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc)
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{})
fakePodControl := &controller.FakePodControl{}
ctr.podControl = fakePodControl
ctr.tfJobInformerSynced = testutil.AlwaysReady
Expand Down Expand Up @@ -281,7 +282,7 @@ func TestDeletePodsAndServices(t *testing.T) {
},
}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
ctr, kubeInformerFactory, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc)
ctr, kubeInformerFactory, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{})
fakePodControl := &controller.FakePodControl{}
ctr.podControl = fakePodControl
fakeServiceControl := &control.FakeServiceControl{}
Expand Down

0 comments on commit 7ebe995

Please sign in to comment.