Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/waitallprocesses config #1394

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/metricscollector/v1beta1/file-metricscollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ var (
metricFilters = flag.String("f", "", "Metric filters")
pollInterval = flag.Duration("p", common.DefaultPollInterval, "Poll interval between running processes check")
timeout = flag.Duration("timeout", common.DefaultTimeout, "Timeout before invoke error during running processes check")
waitAll = flag.Bool("w", common.DefaultWaitAll, "Whether wait for all other main process of container exiting")
waitAllProcesses = flag.String("w", common.DefaultWaitAllProcesses, "Whether wait for all other main process of container exiting")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep this flag String, or we can use Bool?
I assume this flag can be only true or false.

Copy link
Contributor Author

@robbertvdg robbertvdg Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had problems with parsing booleans when a user has not specified the field in the katib-config. Do you know of a good way to do it?
I could leave it string in the katib-config and parse it to bool in GetMetricsCollectorConfigData, would that be better?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robbertvdg It seems that empty bool in Go is false. So when user doesn't specify any values in config we send "false" to the metrics collector.
Maybe we can define MetricsCollectorConfig like this:

type MetricsCollectorConfig struct {
	Image            string                      `json:"image"`
	ImagePullPolicy  corev1.PullPolicy           `json:"imagePullPolicy"`
	Resource         corev1.ResourceRequirements `json:"resources"`
	WaitAllProcesses *bool                      `json:"waitAllProcesses"`
}

In that case, you can set -w flag if WaitAllProcesses != nil. Otherwise the default value is set.

In Katib config I was able to define bool value like this:

 "StdOut": {
        "image": "docker.io/andreyvelichkevich/file-metrics-collector",
        "imagePullPolicy": "Always",
        "waitAllProcesses": false
      },

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem lies with the v1.Container.args which is string[]. Therefore every container argument has to be a string, which I think is not compatible with the flag.Bool() argument (I tested it, doesn't work).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robbertvdg Got it.
In that case can we keep WaitAllProcesses *bool in MetricsCollectorConfig, convert it to string here:

args = append(args, "-w", metricsCollectorConfigData.WaitAllProcesses)

Then, use the String type like you did in Metrics Collector and convert it to Bool after.

In that case, we can avoid situation when user specify wrong values in Katib config.

What do you think @robbertvdg @gaocegege ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreyvelich those were my thoughts as well, will update PR soon.

stopRules stopRulesFlag
isEarlyStopped = false
)
Expand Down Expand Up @@ -353,10 +353,16 @@ func main() {
go printMetricsFile(*metricsFilePath)
}

waitAll, err := strconv.ParseBool(*waitAllProcesses)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep it Bool, we can skip conversion.

if err != nil {
klog.Errorf("Cannot parse %s to bool, defaulting to waitAllProcesses=%s", *waitAllProcesses, common.DefaultWaitAllProcesses)
waitAll, _ = strconv.ParseBool(common.DefaultWaitAllProcesses)
}

wopts := common.WaitPidsOpts{
PollInterval: *pollInterval,
Timeout: *timeout,
WaitAll: *waitAll,
WaitAll: waitAll,
CompletedMarkedDirPath: filepath.Dir(*metricsFilePath),
}
if err := common.WaitMainProcesses(wopts); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/metricscollector/v1beta1/tfevent-metricscollector/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def parse_options():
parser.add_argument("-f", "--metric_filters", type=str, default="")
parser.add_argument("-p", "--poll_interval", type=int, default=const.DEFAULT_POLL_INTERVAL)
parser.add_argument("-timeout", "--timeout", type=int, default=const.DEFAULT_TIMEOUT)
parser.add_argument("-w", "--wait_all", type=bool, default=const.DEFAULT_WAIT_ALL)
parser.add_argument("-w", "--wait_all_processes", type=str, default=const.DEFAULT_WAIT_ALL)

opt = parser.parse_args()
return opt
Expand All @@ -38,6 +38,7 @@ def parse_options():
logger.addHandler(handler)
logger.propagate = False
opt = parse_options()
wait_all_processes = opt.wait_all_processes.lower() != "false"
db_manager_server = opt.db_manager_server_addr.split(':')
if len(db_manager_server) != 2:
raise Exception("Invalid Katib DB manager service address: %s" %
Expand All @@ -46,7 +47,7 @@ def parse_options():
WaitMainProcesses(
pool_interval=opt.poll_interval,
timout=opt.timeout,
wait_all=opt.wait_all,
wait_all=wait_all_processes,
completed_marked_dir=opt.metrics_file_dir)

mc = MetricsCollector(opt.metric_names.split(';'))
Expand Down
2 changes: 1 addition & 1 deletion pkg/metricscollector/v1beta1/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
// To run without timeout set value to 0
DefaultTimeout = 0
// DefaultWaitAll is the default value whether wait for all other main process of container exiting
DefaultWaitAll = true
DefaultWaitAllProcesses = "true"
// TrainingCompleted is the job finished marker in $$$$.pid file when main training process is completed
TrainingCompleted = "completed"

Expand Down
2 changes: 1 addition & 1 deletion pkg/metricscollector/v1beta1/common/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Default value for timeout before invoke error during running processes check
DEFAULT_TIMEOUT = 0
# Default value whether wait for all other main process of container exiting
DEFAULT_WAIT_ALL = True
DEFAULT_WAIT_ALL = "True"
# Default value for directory where TF event metrics are reported
DEFAULT_METRICS_FILE_DIR = "/log"
# Job finished marker in $$$$.pid file when main process is completed
Expand Down
7 changes: 4 additions & 3 deletions pkg/util/v1beta1/katibconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type SuggestionConfig struct {

// MetricsCollectorConfig is the JSON metrics collector structure in Katib config.
type MetricsCollectorConfig struct {
Image string `json:"image"`
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy"`
Resource corev1.ResourceRequirements `json:"resources"`
Image string `json:"image"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add omitempty for the fieds?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add it on all parameters but Image.

ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy"`
Resource corev1.ResourceRequirements `json:"resources"`
WaitAllProcesses string `json:"waitAllProcesses"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, can we keep it bool ?

}

// EarlyStoppingConfig is the JSON early stopping structure in Katib config.
Expand Down
9 changes: 6 additions & 3 deletions pkg/webhook/v1beta1/pod/inject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,15 @@ func (s *sidecarInjector) getMetricsCollectorContainer(trial *trialsv1beta1.Tria
newRule := rule.Name + ";" + rule.Value + ";" + string(rule.Comparison) + ";" + strconv.Itoa(rule.StartStep)
earlyStoppingRules = append(earlyStoppingRules, newRule)
}
metricsCollectorConfigData, err := katibconfig.GetMetricsCollectorConfigData(mc.Collector.Kind, s.client)

args, err := s.getMetricsCollectorArgs(trial, metricNames, mc, earlyStoppingRules)
args, err := s.getMetricsCollectorArgs(trial, metricNames, mc, metricsCollectorConfigData, earlyStoppingRules)
if err != nil {
return nil, err
}

sidecarContainerName := getSidecarContainerName(trial.Spec.MetricsCollector.Collector.Kind)

metricsCollectorConfigData, err := katibconfig.GetMetricsCollectorConfigData(mc.Collector.Kind, s.client)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -287,14 +287,17 @@ func (s *sidecarInjector) getKatibJob(object *unstructured.Unstructured, namespa
return jobKind, jobName, nil
}

func (s *sidecarInjector) getMetricsCollectorArgs(trial *trialsv1beta1.Trial, metricNames string, mc common.MetricsCollectorSpec, esRules []string) ([]string, error) {
func (s *sidecarInjector) getMetricsCollectorArgs(trial *trialsv1beta1.Trial, metricNames string, mc common.MetricsCollectorSpec, metricsCollectorConfigData katibconfig.MetricsCollectorConfig, esRules []string) ([]string, error) {
args := []string{"-t", trial.Name, "-m", metricNames, "-o-type", string(trial.Spec.Objective.Type), "-s-db", katibmanagerv1beta1.GetDBManagerAddr()}
if mountPath, _ := getMountPath(mc); mountPath != "" {
args = append(args, "-path", mountPath)
}
if mc.Source != nil && mc.Source.Filter != nil && len(mc.Source.Filter.MetricsFormat) > 0 {
args = append(args, "-f", strings.Join(mc.Source.Filter.MetricsFormat, ";"))
}
if metricsCollectorConfigData.WaitAllProcesses != "" {
args = append(args, "-w", metricsCollectorConfigData.WaitAllProcesses)
}
// Add stop rules and service endpoint for Early Stopping
if len(esRules) > 0 {
for _, rule := range esRules {
Expand Down
15 changes: 14 additions & 1 deletion pkg/webhook/v1beta1/pod/inject_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pod
import (
"context"
"fmt"
"github.com/kubeflow/katib/pkg/util/v1beta1/katibconfig"
"path/filepath"
"reflect"
"sync"
Expand Down Expand Up @@ -293,6 +294,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) {
MetricNames string
MCSpec common.MetricsCollectorSpec
EarlyStoppingRules []string
KatibConfig katibconfig.MetricsCollectorConfig
ExpectedArgs []string
Name string
Err bool
Expand All @@ -305,12 +307,16 @@ func TestGetMetricsCollectorArgs(t *testing.T) {
Kind: common.StdOutCollector,
},
},
KatibConfig: katibconfig.MetricsCollectorConfig{
WaitAllProcesses: "false",
},
ExpectedArgs: []string{
"-t", testTrialName,
"-m", testMetricName,
"-o-type", string(testObjective),
"-s-db", katibDBAddress,
"-path", common.DefaultFilePath,
"-w", "false",
},
Name: "StdOut MC",
},
Expand All @@ -333,6 +339,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) {
},
},
},
KatibConfig: katibconfig.MetricsCollectorConfig{},
ExpectedArgs: []string{
"-t", testTrialName,
"-m", testMetricName,
Expand All @@ -356,6 +363,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) {
},
},
},
KatibConfig: katibconfig.MetricsCollectorConfig{},
ExpectedArgs: []string{
"-t", testTrialName,
"-m", testMetricName,
Expand All @@ -373,6 +381,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) {
Kind: common.CustomCollector,
},
},
KatibConfig: katibconfig.MetricsCollectorConfig{},
ExpectedArgs: []string{
"-t", testTrialName,
"-m", testMetricName,
Expand All @@ -394,6 +403,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) {
},
},
},
KatibConfig: katibconfig.MetricsCollectorConfig{},
ExpectedArgs: []string{
"-t", testTrialName,
"-m", testMetricName,
Expand All @@ -411,6 +421,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) {
Kind: common.PrometheusMetricCollector,
},
},
KatibConfig: katibconfig.MetricsCollectorConfig{},
ExpectedArgs: []string{
"-t", testTrialName,
"-m", testMetricName,
Expand All @@ -428,6 +439,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) {
},
},
EarlyStoppingRules: earlyStoppingRules,
KatibConfig: katibconfig.MetricsCollectorConfig{},
ExpectedArgs: []string{
"-t", testTrialName,
"-m", testMetricName,
Expand All @@ -452,6 +464,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) {
},
},
EarlyStoppingRules: earlyStoppingRules,
KatibConfig: katibconfig.MetricsCollectorConfig{},
Name: "Trial with invalid Experiment label name. Suggestion is not created",
Err: true,
},
Expand All @@ -465,7 +478,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) {
}, timeout).ShouldNot(gomega.HaveOccurred())

for _, tc := range testCases {
args, err := si.getMetricsCollectorArgs(tc.Trial, tc.MetricNames, tc.MCSpec, tc.EarlyStoppingRules)
args, err := si.getMetricsCollectorArgs(tc.Trial, tc.MetricNames, tc.MCSpec, tc.KatibConfig, tc.EarlyStoppingRules)

if !tc.Err && err != nil {
t.Errorf("Case: %v failed. Expected nil, got %v", tc.Name, err)
Expand Down