Skip to content

Commit

Permalink
K8 array resource manager (flyteorg#120)
Browse files Browse the repository at this point in the history
* Support for remote endpoint
* Unit tests for resource manager and k8s array.
* Few minor fixes and updates.
* Remove owner refs for remote execution.
* Removed project/namespace limit for number of pods.
* abort then finalize
* rm unnecessary code & update state transition
* Adding unit tests, remote endpoint and fixes


Co-authored-by: Miguel Toledo <mtoledo@lyft.com>
  • Loading branch information
anandswaminathan and migueltol22 authored Oct 13, 2020
1 parent 0d28888 commit f6148f4
Show file tree
Hide file tree
Showing 15 changed files with 857 additions and 156 deletions.
2 changes: 1 addition & 1 deletion boilerplate/lyft/golang_support_tools/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
package tools

import (
_ "github.com/alvaroloes/enumer"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/lyft/flytestdlib/cli/pflags"
_ "github.com/vektra/mockery/cmd/mockery"
_ "github.com/alvaroloes/enumer"
)
4 changes: 0 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/go-test/deep v1.0.5
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.5
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/kubeflow/pytorch-operator v0.6.0
github.com/kubeflow/tf-operator v0.5.3
Expand All @@ -27,19 +26,16 @@ require (
github.com/spf13/cobra v0.0.6
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.6.1
go.opencensus.io v0.22.3 // indirect
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e
golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb
google.golang.org/grpc v1.28.0
gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c // indirect
gotest.tools v2.2.0+incompatible
k8s.io/api v0.17.3
k8s.io/apimachinery v0.17.3
k8s.io/client-go v11.0.1-0.20190918222721-c0e3722d5cf0+incompatible
k8s.io/klog v1.0.0
sigs.k8s.io/controller-runtime v0.5.1
sigs.k8s.io/yaml v1.2.0 // indirect
)

// Pin the version of client-go to something that's compatible with katrogan's fork of api and apimachinery
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607 h1:cTavhURetDkezJCvxFggiyLeP40Mrk/TtVg2+ycw1Es=
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0=
github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607/go.mod h1:Cg4fM0vhYWOZdgM7RIOSTRNIc8/VT7CXClC3Ni86lu4=
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
Expand All @@ -164,6 +165,7 @@ github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.8-0.20191012010759-4bf2d1fec783/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
Expand Down Expand Up @@ -382,6 +384,10 @@ github.com/lyft/flyteidl v0.18.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/
github.com/lyft/flyteplugins v0.5.1/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flytepropeller v0.4.2/go.mod h1:TIiWv/ZP1KOI0mqeUbiMqSn2XuY8O8kn8fQc5tWcaLA=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE=
github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.9 h1:NaKp9xkeWWwhVvqTOcR/FqlASy1N2gu/kN7PVe4S7YI=
github.com/lyft/flytestdlib v0.3.9/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI=
Expand Down
3 changes: 2 additions & 1 deletion go/tasks/aws/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ package aws
import (
"time"

pluginsConfig "github.com/lyft/flyteplugins/go/tasks/config"
"github.com/lyft/flytestdlib/config"

pluginsConfig "github.com/lyft/flyteplugins/go/tasks/config"
)

//go:generate pflags Config --default-var defaultConfig
Expand Down
9 changes: 6 additions & 3 deletions go/tasks/pluginmachinery/core/mocks/resource_negotiator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (p Phase) IsSuccess() bool {
return p == PhaseSuccess
}

func (p Phase) IsWaitingForResources() bool {
return p == PhaseWaitingForResources
}

type TaskInfo struct {
// log information for the task execution
Logs []*core.TaskLog
Expand Down
13 changes: 10 additions & 3 deletions go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus
totalSuccesses := int64(0)
totalFailures := int64(0)
totalRunning := int64(0)
totalWaitingForResources := int64(0)
for phase, count := range summary {
totalCount += count
if phase.IsTerminal() {
Expand All @@ -238,6 +239,8 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus
// TODO: preferable to auto-combine to array tasks for now.
totalFailures += count
}
} else if phase.IsWaitingForResources() {
totalWaitingForResources += count
} else {
totalRunning += count
}
Expand All @@ -249,12 +252,16 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus
}

// No chance to reach the required success numbers.
if totalRunning+totalSuccesses < minSuccesses {
logger.Infof(ctx, "Array failed early because totalRunning[%v] + totalSuccesses[%v] < minSuccesses[%v]",
totalRunning, totalSuccesses, minSuccesses)
if totalRunning+totalSuccesses+totalWaitingForResources < minSuccesses {
logger.Infof(ctx, "Array failed early because total failures > minSuccesses[%v]. Snapshot totalRunning[%v] + totalSuccesses[%v] + totalWaitingForResource[%v]",
totalRunning, totalSuccesses, totalWaitingForResources, minSuccesses)
return PhaseWriteToDiscoveryThenFail
}

if totalWaitingForResources > 0 {
logger.Infof(ctx, "Array is still running and waiting for resources totalWaitingForResources[%v]", totalWaitingForResources)
return PhaseWaitingForResources
}
if totalSuccesses >= minSuccesses && totalRunning == 0 {
logger.Infof(ctx, "Array succeeded because totalSuccesses[%v] >= minSuccesses[%v]", totalSuccesses, minSuccesses)
return PhaseWriteToDiscovery
Expand Down
83 changes: 80 additions & 3 deletions go/tasks/plugins/array/k8s/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
package k8s

import (
"fmt"
"io/ioutil"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
restclient "k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

pluginsConfig "github.com/lyft/flyteplugins/go/tasks/config"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue"
"github.com/lyft/flytestdlib/config"
)

//go:generate pflags Config --default-var=defaultConfig
Expand All @@ -31,14 +37,80 @@ var (
},
}

configSection = config.MustRegisterSection(configSectionKey, defaultConfig)
configSection = pluginsConfig.MustRegisterSubSection(configSectionKey, defaultConfig)
)

type ResourceConfig struct {
PrimaryLabel string `json:"primaryLabel" pflag:",PrimaryLabel of a given service cluster"`
Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) for the cluster"`
}

type ClusterConfig struct {
Name string `json:"name" pflag:",Friendly name of the remote cluster"`
Endpoint string `json:"endpoint" pflag:", Remote K8s cluster endpoint"`
Auth Auth `json:"auth" pflag:"-, Auth setting for the cluster"`
Enabled bool `json:"enabled" pflag:", Boolean flag to enable or disable"`
}

type Auth struct {
Type string `json:"type" pflag:", Authentication type"`
TokenPath string `json:"tokenPath" pflag:", Token path"`
CertPath string `json:"certPath" pflag:", Certificate path"`
}

func (auth Auth) GetCA() ([]byte, error) {
cert, err := ioutil.ReadFile(auth.CertPath)
if err != nil {
return nil, errors.Wrap(err, "failed to read k8s CA cert from configured path")
}
return cert, nil
}

func (auth Auth) GetToken() (string, error) {
token, err := ioutil.ReadFile(auth.TokenPath)
if err != nil {
return "", errors.Wrap(err, "failed to read k8s bearer token from configured path")
}
return string(token), nil
}

// TODO: Move logic to flytestdlib
// Reads secret values from paths specified in the config to initialize a Kubernetes rest client Config.
func RemoteClusterConfig(host string, auth Auth) (*restclient.Config, error) {
tokenString, err := auth.GetToken()
if err != nil {
return nil, errors.New(fmt.Sprintf("Failed to get auth token: %+v", err))
}

caCert, err := auth.GetCA()
if err != nil {
return nil, errors.New(fmt.Sprintf("Failed to get auth CA: %+v", err))
}

tlsClientConfig := restclient.TLSClientConfig{}
tlsClientConfig.CAData = caCert
return &restclient.Config{
Host: host,
TLSClientConfig: tlsClientConfig,
BearerToken: tokenString,
}, nil
}

func GetK8sClient(config ClusterConfig) (client.Client, error) {
kubeConf, err := RemoteClusterConfig(config.Endpoint, config.Auth)
if err != nil {
return nil, err
}
return client.New(kubeConf, client.Options{})
}

// Defines custom config for K8s Array plugin
type Config struct {
DefaultScheduler string `json:"scheduler" pflag:",Decides the scheduler to use when launching array-pods."`
MaxErrorStringLength int `json:"maxErrLength" pflag:",Determines the maximum length of the error string returned for the array."`
MaxErrorStringLength int `json:"maxErrorLength" pflag:",Determines the maximum length of the error string returned for the array."`
MaxArrayJobSize int64 `json:"maxArrayJobSize" pflag:",Maximum size of array job."`
ResourceConfig ResourceConfig `json:"resourceConfig" pflag:"-,ResourceConfiguration to limit number of resources used by k8s-array."`
RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"-,Configuration of remote K8s cluster for array jobs"`
NodeSelector map[string]string `json:"node-selector" pflag:"-,Defines a set of node selector labels to add to the pod."`
Tolerations []v1.Toleration `json:"tolerations" pflag:"-,Tolerations to be applied for k8s-array pods"`
OutputAssembler workqueue.Config
Expand All @@ -48,3 +120,8 @@ type Config struct {
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func IsResourceConfigSet(resourceConfig ResourceConfig) bool {
emptyResouceConfig := ResourceConfig{}
return resourceConfig != emptyResouceConfig
}
8 changes: 8 additions & 0 deletions go/tasks/plugins/array/k8s/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f6148f4

Please sign in to comment.