Skip to content

Commit

Permalink
Fix oar deadline support
Browse files Browse the repository at this point in the history
  • Loading branch information
mickours committed Apr 24, 2024
1 parent dce049b commit f4920d8
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 15 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ The be excluded from the Bebida default Punch mechanism the Spark executors need
BeBiDa uses annotation to gather information about job types and resources requirements. Annotations for BeBiDa are:

* `ryax.tech/timeCritical`: set to `true` to prioritize the job as time critical (defaults to: `false`)
* `ryax.tech/deadline`: date of the deadline in the RFC3339 format, e.g. "2006-01-02T15:04:05Z07:00"
* `ryax.tech/duration`: walltime in seconds​
* `ryax.tech/resources.cores`: number of cores needed
* `ryax.tech/resources.memory`​: memory in megabytes
* `ryax.tech/deadline`: date of the deadline in the RFC3339 format, e.g. "2006-01-02T15:04:05+07:00"
* `ryax.tech/duration`: walltime with a unit
* `ryax.tech/resources.cores`: number of CPU cores needed

## Setup a testing environment

Expand Down
3 changes: 1 addition & 2 deletions bebida-shaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func main() {
deadline := annotateCmd.String("deadline", "", "App deadline date")
duration := annotateCmd.String("duration", "900s", "App duration in seconds")
cores := annotateCmd.Int("cores", 1, "Number of cores reqired")
memory := annotateCmd.Int("memory", 1024, "Amount of memory reqired in Bytes")

flag.Parse()
if len(os.Args) < 2 {
Expand All @@ -109,7 +108,7 @@ func main() {
run()
case "annotate":
annotateCmd.Parse(os.Args[2:])
err := utils.Annotate(annotateCmd.Arg(0), *deadline, *duration, *cores, *memory)
err := utils.Annotate(annotateCmd.Arg(0), *deadline, *duration, *cores)
if err != nil {
panic(err)
}
Expand Down
11 changes: 10 additions & 1 deletion connectors/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connectors
import (
"context"
"os"
"strconv"
"time"

"github.com/RyaxTech/bebida-shaker/events"
Expand Down Expand Up @@ -53,10 +54,18 @@ func WatchQueues(channel chan interface{}) {
}
pendingPod := events.NewPendingPod()
pendingPod.PodId = pod.ObjectMeta.Name
nbCpu, _ := pod.Spec.Containers[0].Resources.Requests.Cpu().AsInt64()

nbCpu := 0
if pod.Annotations[bebida_prefix+"resources.cores"] != "" {
nbCpu, err = strconv.Atoi(pod.Annotations[bebida_prefix+"resources.cores"])
if err != nil {
log.Warnf("Error %s while parsing resources.cores annotation for Pod %v+\n", err, pod)
}
}
if nbCpu > 0 {
pendingPod.NbCores = int(nbCpu)
}

deadline, err := time.Parse(time.RFC3339, pod.Annotations[bebida_prefix+"deadline"])
if err != nil {
log.Warnf("Error %s while retrieving deadline for Pod %v+\n", err, pod)
Expand Down
8 changes: 5 additions & 3 deletions connectors/oar.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ var ExecuteCommand = exec.ExecuteCommand
func (OAR) Punch(nbCpuPerJob int, jobDuration time.Duration, deadline time.Time) (string, error) {
// TODO put this in a config file (or env var)
randomSuffix := utils.RandomString(8)
// FIXME: user1 is hardcoded here, maybe we should use the right user for Bebida directly ass SSH level...
cmd := fmt.Sprintf("su user1 --command 'oarsub --name BEBIDA_NOOP_%s -l nodes=%d,walltime=00:%d:00 \"sleep %d\"'", randomSuffix, nbCpuPerJob, int(jobDuration.Minutes()), int(jobDuration.Seconds()))
oarsubOpts := fmt.Sprintf("--name BEBIDA_NOOP_%s -l nodes=%d,walltime=00:%d:00 \"sleep %d\"", randomSuffix, nbCpuPerJob, int(jobDuration.Minutes()), int(jobDuration.Seconds()))
if !deadline.IsZero() {
cmd = fmt.Sprintf("%s -r '%s'", cmd, deadline.Add(-jobDuration).Format("2007-10-24 18:00:00"))
oarsubOpts = fmt.Sprintf("-r '%s' %s", deadline.Add(-jobDuration).Format("2007-10-24 18:00:00"), oarsubOpts)
}

// FIXME: user1 is hardcoded here, maybe we should use the right user for Bebida directly ass SSH level...
cmd := fmt.Sprintf("su user1 --command 'oarsub %s'", oarsubOpts)
out, err := ExecuteCommand(cmd)
log.Infof("Punch command output: %s", string(out))

Expand Down
6 changes: 3 additions & 3 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func NewPendingPod() PendingPod {
}

type PodCompleted struct {
PodId string
NbCore int
PodId string
NbCore int
CompletionTime time.Duration
TimeCritical bool
TimeCritical bool
}
3 changes: 1 addition & 2 deletions utils/bebida.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

var prefix = "ryax.tech/"

func Annotate(filePath string, deadline string, duration string, cores int, memory int) error {
func Annotate(filePath string, deadline string, duration string, cores int) error {

fileContent, err := ioutil.ReadFile(filePath)
if err != nil {
Expand All @@ -34,7 +34,6 @@ func Annotate(filePath string, deadline string, duration string, cores int, memo
annotations[prefix+"deadline"] = deadline
annotations[prefix+"duration"] = duration
annotations[prefix+"resources.cores"] = strconv.Itoa(cores)
annotations[prefix+"resources.memory"] = strconv.Itoa(memory)
annotationsInYaml := data["metadata"].(map[string]interface{})
annotationsInYaml["annotations"] = annotations
}
Expand Down

0 comments on commit f4920d8

Please sign in to comment.