Skip to content

Commit

Permalink
support JSON format logs in file-metrics-collector
Browse files Browse the repository at this point in the history
  • Loading branch information
tenzen-y committed Jan 1, 2022
1 parent 2a0b12e commit 1fb26bb
Show file tree
Hide file tree
Showing 27 changed files with 770 additions and 114 deletions.
179 changes: 117 additions & 62 deletions cmd/metricscollector/v1beta1/file-metricscollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -102,6 +104,7 @@ var (
earlyStopServiceAddr = flag.String("s-earlystop", "", "Katib Early Stopping service endpoint")
trialName = flag.String("t", "", "Trial Name")
metricsFilePath = flag.String("path", "", "Metrics File Path")
metricsFileFormat = flag.String("format", "", "Metrics File Format")
metricNames = flag.String("m", "", "Metric names")
objectiveType = flag.String("o-type", "", "Objective type")
metricFilters = flag.String("f", "", "Metric filters")
Expand Down Expand Up @@ -170,7 +173,10 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
}

// Get list of regural expressions from filters.
metricRegList := filemc.GetFilterRegexpList(filters)
var metricRegList []*regexp.Regexp
if *metricsFileFormat == commonv1beta1.TextFormat.String() {
metricRegList = filemc.GetFilterRegexpList(filters)
}

// Start watch log lines.
t, _ := tail.TailFile(mFile, tail.Config{Follow: true})
Expand All @@ -179,78 +185,78 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
// Print log line
klog.Info(logText)

// Check if log line contains metric from stop rules.
isRuleLine := false
for _, rule := range stopRules {
if strings.Contains(logText, rule.Name) {
isRuleLine = true
break
}
}
// If log line doesn't contain appropriate metric, continue track file.
if !isRuleLine {
continue
}

// If log line contains appropriate metric, find all submatches from metric filters.
for _, metricReg := range metricRegList {
matchStrings := metricReg.FindAllStringSubmatch(logText, -1)
for _, subMatchList := range matchStrings {
if len(subMatchList) < 3 {
continue
}
// Submatch must have metric name and float value
metricName := strings.TrimSpace(subMatchList[1])
metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName)
switch *metricsFileFormat {
case commonv1beta1.TextFormat.String():
// Check if log line contains metric from stop rules.
isRuleLine := false
for _, rule := range stopRules {
if strings.Contains(logText, rule.Name) {
isRuleLine = true
break
}
}
// If log line doesn't contain appropriate metric, continue track file.
if !isRuleLine {
continue
}

// stopRules contains array of EarlyStoppingRules that has not been reached yet.
// After rule is reached we delete appropriate element from the array.
for idx, rule := range stopRules {
if metricName != rule.Name {
// If log line contains appropriate metric, find all submatches from metric filters.
for _, metricReg := range metricRegList {
matchStrings := metricReg.FindAllStringSubmatch(logText, -1)
for _, subMatchList := range matchStrings {
if len(subMatchList) < 3 {
continue
}

// Calculate optimalObjValue.
if metricName == objMetric {
if optimalObjValue == nil {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue {
optimalObjValue = &metricValue
}
// Assign best optimal value to metric value.
metricValue = *optimalObjValue
// Submatch must have metric name and float value
metricName := strings.TrimSpace(subMatchList[1])
metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName)
}

// Reduce steps if appropriate metric is reported.
// Once rest steps are empty we apply early stopping rule.
if _, ok := metricStartStep[metricName]; ok {
metricStartStep[metricName]--
if metricStartStep[metricName] != 0 {
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
// After rule is reached we delete appropriate element from the array.
for idx, rule := range stopRules {
if metricName != rule.Name {
continue
}
stopRules = updateStopRules(objMetric, stopRules, optimalObjValue, metricValue, objType, metricStartStep, rule, idx)
}
}
}
case commonv1beta1.JsonFormat.String():
var logJsonObj map[string]interface{}
if err = json.Unmarshal([]byte(logText), &logJsonObj); err != nil {
klog.Fatalf("Failed to unmarshal logs in JSON format, log: %s, error: %v", logText, err)
}
// Check if log line contains metric from stop rules.
isRuleLine := false
for _, rule := range stopRules {
if _, exist := logJsonObj[rule.Name]; exist {
isRuleLine = true
break
}
}
// If log line doesn't contain appropriate metric, continue track file.
if !isRuleLine {
continue
}

ruleValue, err := strconv.ParseFloat(rule.Value, 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name)
}

// Metric value can be equal, less or greater than stop rule.
// Deleting suitable stop rule from the array.
if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue {
stopRules = deleteStopRule(stopRules, idx)
} else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue {
stopRules = deleteStopRule(stopRules, idx)
} else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue {
stopRules = deleteStopRule(stopRules, idx)
}
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
// After rule is reached we delete appropriate element from the array.
for idx, rule := range stopRules {
value, exist := logJsonObj[rule.Name].(string)
if !exist {
continue
}
metricValue, err := strconv.ParseFloat(strings.TrimSpace(value), 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, rule.Name)
}
stopRules = updateStopRules(objMetric, stopRules, optimalObjValue, metricValue, objType, metricStartStep, rule, idx)
}
default:
klog.Fatalf("format must be set %s or %s", commonv1beta1.TextFormat.String(), commonv1beta1.JsonFormat.String())
}

// If stopRules array is empty, Trial is early stopped.
Expand Down Expand Up @@ -326,6 +332,55 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
}
}

func updateStopRules(
objMetric string,
stopRules []commonv1beta1.EarlyStoppingRule,
optimalObjValue *float64,
metricValue float64,
objType commonv1beta1.ObjectiveType,
metricStartStep map[string]int,
rule commonv1beta1.EarlyStoppingRule,
ruleIdx int,
) []commonv1beta1.EarlyStoppingRule {
// Calculate optimalObjValue.
if rule.Name == objMetric {
if optimalObjValue == nil {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue {
optimalObjValue = &metricValue
}
// Assign best optimal value to metric value.
metricValue = *optimalObjValue
}

// Reduce steps if appropriate metric is reported.
// Once rest steps are empty we apply early stopping rule.
if _, ok := metricStartStep[rule.Name]; ok {
metricStartStep[rule.Name]--
if metricStartStep[rule.Name] != 0 {
return stopRules
}
}

ruleValue, err := strconv.ParseFloat(rule.Value, 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name)
}

// Metric value can be equal, less or greater than stop rule.
// Deleting suitable stop rule from the array.
if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue {
return deleteStopRule(stopRules, ruleIdx)
} else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue {
return deleteStopRule(stopRules, ruleIdx)
} else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue {
return deleteStopRule(stopRules, ruleIdx)
}
return stopRules
}

func deleteStopRule(stopRules []commonv1beta1.EarlyStoppingRule, idx int) []commonv1beta1.EarlyStoppingRule {
if idx >= len(stopRules) {
klog.Fatalf("Index %v out of range stopRules: %v", idx, stopRules)
Expand Down Expand Up @@ -383,7 +438,7 @@ func reportMetrics(filters []string) {
if len(*metricNames) != 0 {
metricList = strings.Split(*metricNames, ";")
}
olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters)
olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters, *metricsFileFormat)
if err != nil {
klog.Fatalf("Failed to collect logs: %v", err)
}
Expand Down
72 changes: 72 additions & 0 deletions examples/v1beta1/early-stopping/median-stop-with-json-format.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# This is example with median stopping early stopping rule with logs in JSON format.
# It has bad feasible space for learning rate to show more early stopped Trials.
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
namespace: kubeflow
name: median-stop-with-json-format
spec:
objective:
type: maximize
goal: 0.99
objectiveMetricName: accuracy
additionalMetricNames:
- loss
metricsCollectorSpec:
source:
fileSystemPath:
path: "/katib/mnist.json"
kind: File
fileFormat: JSON
collector:
kind: File
algorithm:
algorithmName: random
earlyStopping:
algorithmName: medianstop
algorithmSettings:
- name: min_trials_required
value: "1"
- name: start_step
value: "2"
parallelTrialCount: 2
maxTrialCount: 15
maxFailedTrialCount: 3
parameters:
- name: lr
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.5"
- name: num-epochs
parameterType: int
feasibleSpace:
min: "3"
max: "4"
trialTemplate:
retain: true
primaryContainerName: training-container
trialParameters:
- name: learningRate
description: Learning rate for the training model
reference: lr
- name: numberEpochs
description: Number of epochs to train the model
reference: num-epochs
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: docker.io/kubeflowkatib/pytorch-mnist:latest
command:
- "python3"
- "/opt/pytorch-mnist/mnist.py"
- "--epochs=${trialParameters.numberEpochs}"
- "--log-path=/katib/mnist.json"
- "--lr=${trialParameters.learningRate}"
- "--logger=hypertune"
restartPolicy: Never
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
namespace: kubeflow
name: file-metrics-collector-with-json-format
spec:
objective:
type: maximize
goal: 0.99
objectiveMetricName: accuracy
additionalMetricNames:
- loss
metricsCollectorSpec:
source:
fileSystemPath:
path: "/katib/mnist.json"
kind: File
fileFormat: JSON
collector:
kind: File
algorithm:
algorithmName: random
parallelTrialCount: 3
maxTrialCount: 12
maxFailedTrialCount: 3
parameters:
- name: lr
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.03"
- name: momentum
parameterType: double
feasibleSpace:
min: "0.3"
max: "0.7"
trialTemplate:
primaryContainerName: training-container
trialParameters:
- name: learningRate
description: Learning rate for the training model
reference: lr
- name: momentum
description: Momentum for the training model
reference: momentum
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: docker.io/kubeflowkatib/pytorch-mnist:latest
command:
- "python3"
- "/opt/pytorch-mnist/mnist.py"
- "--epochs=2"
- "--log-path=/katib/mnist.json"
- "--lr=${trialParameters.learningRate}"
- "--momentum=${trialParameters.momentum}"
- "--logger=hypertune"
restartPolicy: Never
1 change: 1 addition & 0 deletions examples/v1beta1/trial-images/pytorch-mnist/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ WORKDIR /opt/pytorch-mnist

# Add folder for the logs.
RUN mkdir /katib
RUN pip install --no-cache-dir -r requirements.txt

RUN chgrp -R 0 /opt/pytorch-mnist \
&& chmod -R g+rwX /opt/pytorch-mnist \
Expand Down
6 changes: 4 additions & 2 deletions examples/v1beta1/trial-images/pytorch-mnist/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ to the file or printing to the StdOut. It uses convolutional neural network to
train the model.

Katib uses this training container in some Experiments, for instance in the
[file Metrics Collector example](../../metrics-collector/file-metrics-collector.yaml#L55-L64)
or in the [PyTorchJob example](../../kubeflow-training-operator/pytorchjob-mnist.yaml#L47-L54).
[file Metrics Collector example](../../metrics-collector/file-metrics-collector.yaml#L55-L64),
the [file Metrics Collector with logs in JSON format example](../../metrics-collector/file-metrics-collector-with-json-format.yaml#L52-L62),
the [median stopping early stopping rule with logs in JSON format example](../../early-stopping/median-stop-with-json-format.yaml#L62-L71)
and the [PyTorchJob example](../../kubeflow-training-operator/pytorchjob-mnist.yaml#L47-L54).
Loading

0 comments on commit 1fb26bb

Please sign in to comment.