diff --git a/README.md b/README.md index 7dcb00d..410c8bb 100644 --- a/README.md +++ b/README.md @@ -63,10 +63,9 @@ The be excluded from the Bebida default Punch mechanism the Spark executors need BeBiDa uses annotation to gather information about job types and resources requirements. Annotations for BeBiDa are: * `ryax.tech/timeCritical`: set to `true` to prioritize the job as time critical (defaults to: `false`) -* `ryax.tech/deadline`: date of the deadline in the RFC3339 format, e.g. "2006-01-02T15:04:05Z07:00" -* `ryax.tech/duration`: walltime in seconds​ -* `ryax.tech/resources.cores`: number of cores needed -* `ryax.tech/resources.memory`​: memory in megabytes +* `ryax.tech/deadline`: date of the deadline in the RFC3339 format, e.g. "2006-01-02T15:04:05+07:00" +* `ryax.tech/duration`: walltime with a unit +* `ryax.tech/resources.cores`: number of CPU cores needed ## Setup a testing environment diff --git a/bebida-shaker.go b/bebida-shaker.go index 3da60a1..306713b 100644 --- a/bebida-shaker.go +++ b/bebida-shaker.go @@ -91,7 +91,6 @@ func main() { deadline := annotateCmd.String("deadline", "", "App deadline date") duration := annotateCmd.String("duration", "900s", "App duration in seconds") cores := annotateCmd.Int("cores", 1, "Number of cores reqired") - memory := annotateCmd.Int("memory", 1024, "Amount of memory reqired in Bytes") flag.Parse() if len(os.Args) < 2 { @@ -109,7 +108,7 @@ func main() { run() case "annotate": annotateCmd.Parse(os.Args[2:]) - err := utils.Annotate(annotateCmd.Arg(0), *deadline, *duration, *cores, *memory) + err := utils.Annotate(annotateCmd.Arg(0), *deadline, *duration, *cores) if err != nil { panic(err) } diff --git a/connectors/k8s.go b/connectors/k8s.go index 709eb2d..dd679c4 100644 --- a/connectors/k8s.go +++ b/connectors/k8s.go @@ -3,6 +3,7 @@ package connectors import ( "context" "os" + "strconv" "time" "github.com/RyaxTech/bebida-shaker/events" @@ -53,10 +54,18 @@ func WatchQueues(channel chan interface{}) { } pendingPod := events.NewPendingPod() pendingPod.PodId = pod.ObjectMeta.Name - nbCpu, _ := pod.Spec.Containers[0].Resources.Requests.Cpu().AsInt64() + + nbCpu := 0 + if pod.Annotations[bebida_prefix+"resources.cores"] != "" { + nbCpu, err = strconv.Atoi(pod.Annotations[bebida_prefix+"resources.cores"]) + if err != nil { + log.Warnf("Error %s while parsing resources.cores annotation for Pod %v+\n", err, pod) + } + } if nbCpu > 0 { pendingPod.NbCores = int(nbCpu) } + deadline, err := time.Parse(time.RFC3339, pod.Annotations[bebida_prefix+"deadline"]) if err != nil { log.Warnf("Error %s while retrieving deadline for Pod %v+\n", err, pod) diff --git a/connectors/oar.go b/connectors/oar.go index c2f54c2..791b18f 100644 --- a/connectors/oar.go +++ b/connectors/oar.go @@ -19,11 +19,13 @@ var ExecuteCommand = exec.ExecuteCommand func (OAR) Punch(nbCpuPerJob int, jobDuration time.Duration, deadline time.Time) (string, error) { // TODO put this in a config file (or env var) randomSuffix := utils.RandomString(8) - // FIXME: user1 is hardcoded here, maybe we should use the right user for Bebida directly ass SSH level... - cmd := fmt.Sprintf("su user1 --command 'oarsub --name BEBIDA_NOOP_%s -l nodes=%d,walltime=00:%d:00 \"sleep %d\"'", randomSuffix, nbCpuPerJob, int(jobDuration.Minutes()), int(jobDuration.Seconds())) + oarsubOpts := fmt.Sprintf("--name BEBIDA_NOOP_%s -l nodes=%d,walltime=00:%d:00 \"sleep %d\"", randomSuffix, nbCpuPerJob, int(jobDuration.Minutes()), int(jobDuration.Seconds())) if !deadline.IsZero() { - cmd = fmt.Sprintf("%s -r '%s'", cmd, deadline.Add(-jobDuration).Format("2007-10-24 18:00:00")) + oarsubOpts = fmt.Sprintf("-r '%s' %s", deadline.Add(-jobDuration).Format("2007-10-24 18:00:00"), oarsubOpts) } + + // FIXME: user1 is hardcoded here, maybe we should use the right user for Bebida directly ass SSH level... + cmd := fmt.Sprintf("su user1 --command 'oarsub %s'", oarsubOpts) out, err := ExecuteCommand(cmd) log.Infof("Punch command output: %s", string(out)) diff --git a/events/events.go b/events/events.go index 6a06a2e..6bed28f 100644 --- a/events/events.go +++ b/events/events.go @@ -19,8 +19,8 @@ func NewPendingPod() PendingPod { } type PodCompleted struct { - PodId string - NbCore int + PodId string + NbCore int CompletionTime time.Duration - TimeCritical bool + TimeCritical bool } diff --git a/utils/bebida.go b/utils/bebida.go index 8367858..e97f4a6 100644 --- a/utils/bebida.go +++ b/utils/bebida.go @@ -12,7 +12,7 @@ import ( var prefix = "ryax.tech/" -func Annotate(filePath string, deadline string, duration string, cores int, memory int) error { +func Annotate(filePath string, deadline string, duration string, cores int) error { fileContent, err := ioutil.ReadFile(filePath) if err != nil { @@ -34,7 +34,6 @@ func Annotate(filePath string, deadline string, duration string, cores int, memo annotations[prefix+"deadline"] = deadline annotations[prefix+"duration"] = duration annotations[prefix+"resources.cores"] = strconv.Itoa(cores) - annotations[prefix+"resources.memory"] = strconv.Itoa(memory) annotationsInYaml := data["metadata"].(map[string]interface{}) annotationsInYaml["annotations"] = annotations }