Skip to content

Commit

Permalink
Fix quitPunch and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
mickours committed Feb 21, 2024
1 parent 8580940 commit def592c
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 25 deletions.
12 changes: 7 additions & 5 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ type Parameters struct {
}

var params Parameters

var podIdToJobIdMap = make(map[string]string)
func schedule(event interface{}, hpcScheduler connectors.HPCConnector) {
switch event.(type) {
case events.NewPendingPod:
case events.PendingPod:
log.Infof("Handling new pending pod: %v+\n", event)
pod := event.(events.NewPendingPod)
_, err := hpcScheduler.Punch(int(pod.NbCores), int(pod.Requested_time.Seconds()))
pod := event.(events.PendingPod)
jobId, err := hpcScheduler.Punch(int(pod.NbCores), int(pod.RequestedTime.Seconds()))
podIdToJobIdMap[pod.PodId] = jobId
if err != nil {
log.Errorf("Unable to allocate resources %s", err)
}
case events.PodCompleted:
log.Infof("Handling pod completed: %v+\n", event)

pod := event.(events.PodCompleted)
hpcScheduler.QuitPunch(podIdToJobIdMap[pod.PodId])
default:
log.Fatalf("Unknown event %v+\n", event)
panic(-1)
Expand Down
19 changes: 17 additions & 2 deletions connectors/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,37 @@ func WatchQueues(channel chan interface{}) {
switch event.Type {
case watch.Added:
log.Infof("Pod %s/%s added", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
pendingPod := events.NewPendingPod()
pendingPod.PodId = pod.ObjectMeta.Name
nbCpu, _ := pod.Spec.Containers[0].Resources.Requests.Cpu().AsInt64()
if nbCpu > 0 {
pendingPod.NbCores = int(nbCpu)
}
deadline, err := time.Parse(pod.Labels["deadline"], time.RFC3339)
if err != nil {
log.Warnf("Error %s while retrieving CPU request for Pod %v+\n", err, pod)
}
if deadline.After(time.Now().Add(time.Minute)) {
pendingPod.Deadline = deadline
}
requestedTime, err := time.ParseDuration(pod.Labels["duration"])
timeCritical := (pod.Labels["timeCritical"] != "")
channel <- events.NewPendingPod{PodId: pod.Name, NbCores: int(nbCpu), Requested_time: requestedTime, Deadline: deadline, TimeCritical: timeCritical}
if err != nil {
log.Warnf("Error %s while retrieving duration annotation for Pod %v+\n", err, pod)
} else if requestedTime > time.Minute {
pendingPod.RequestedTime = requestedTime
}
pendingPod.TimeCritical = (pod.Labels["timeCritical"] != "")
channel <- pendingPod
case watch.Modified:
log.Infof("Pod %s/%s modified", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
case watch.Deleted:
log.Infof("Pod %s/%s deleted", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
channel <- events.PodCompleted{PodId: pod.ObjectMeta.Name}
}
}
}

// DEPRECATED
func GetQueueSize() (int, int, []DeadlineAwareJob, error) {
k8sConfig := K8sConfig{namespace: "default", labelSelector: "", kubeconfigPath: os.Getenv("KUBECONFIG")}
namespace := k8sConfig.namespace
Expand Down
23 changes: 15 additions & 8 deletions connectors/oar.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"
"strings"
"regexp"

"github.com/RyaxTech/bebida-shaker/connectors/exec"
"github.com/RyaxTech/bebida-shaker/connectors/utils"
Expand All @@ -12,24 +13,30 @@ import (

type OAR struct{}

var ExecuteCommand = exec.ExecuteCommand

func (OAR) Punch(nbCpuPerJob int, jobDurationInSeconds int) (string, error) {
// TODO put this in a config file (or env var)
randomSuffix := utils.RandomString(8)
cmd := fmt.Sprintf("oarsub --name BEBIDA_NOOP_%s -l cores=%d sleep %d | grep OAR_JOB_ID | cut -d'=' -f2", randomSuffix, nbCpuPerJob, jobDurationInSeconds)
out, err := exec.ExecuteCommand(cmd)
cmd := fmt.Sprintf("oarsub --name BEBIDA_NOOP_%s -l cores=%d sleep %d", randomSuffix, nbCpuPerJob, jobDurationInSeconds)
out, err := ExecuteCommand(cmd)
log.Infof("Punch command output: %s", string(out))

// Find the job ID
jobReg := regexp.MustCompile("OAR_JOB_ID=([0-9]+)")
jobId := jobReg.FindStringSubmatch(out)[1]

if err != nil {
log.Errorf("Unable to submit job: %s", err)
return "", err
}

log.Infof("Punch command output: %s", string(out))
return out, nil
return jobId, nil
}

func (OAR) QuitPunch(jobID string) error {
cmd := fmt.Sprintf("oardel %s", jobID)
out, err := exec.ExecuteCommand(cmd)
out, err := ExecuteCommand(cmd)
if err != nil {
log.Errorf("Unable to delete job: %s", err)
return err
Expand All @@ -42,7 +49,7 @@ func (OAR) QuitPunch(jobID string) error {
func (oar OAR) QuitAllPunch() error {
// get OAR job ID from the name
cmd := string("oarstat --json | jq '.[] | select(.name | match(\"BEBIDA_NOOP\")) | .id' -r)")
out, err := exec.ExecuteCommand(cmd)
out, err := ExecuteCommand(cmd)
if err != nil {
log.Errorf("Unable to list bebida jobs: %s", err)
return err
Expand All @@ -64,7 +71,7 @@ func (OAR) Refill(nbResources int) error {
if nbResources != -1 {
// Apply quota on the server by changing the file content. It's reloaded for every scheduling round.
cmd := string("oarstat --json | jq '. | length'")
out, err := exec.ExecuteCommand(cmd)
out, err := ExecuteCommand(cmd)
if err != nil {
log.Errorf("Unable to list bebida jobs: %s", err)
return err
Expand All @@ -83,7 +90,7 @@ func (OAR) Refill(nbResources int) error {
// "<Queue>, <project>, <job_type>, <user>": [<Maximum used resources>, <Max running job>, <Max resource per hours>]
quota := fmt.Sprintf("{\"quotas\": \"*,*,*,*\": [-1, %d, -1]}", quotaResource)
cmd := fmt.Sprintf("echo '%s' > /etc/oar/quotas.json", quota)
_, err := exec.ExecuteCommand(cmd)
_, err := ExecuteCommand(cmd)
if err != nil {
log.Errorf("Unable to list bebida jobs: %s", err)
return err
Expand Down
15 changes: 15 additions & 0 deletions connectors/oar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package connectors

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestPunch(t *testing.T) {

ExecuteCommand = func(cmd string) (string, error) { return "\nOAR_JOB_ID=1234\nNot relevant", nil }
jobId, _ := OAR{}.Punch(1, 10)

assert.Equal(t, "1234", jobId, "Job id should be the same")
}
24 changes: 16 additions & 8 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@ package events
import "time"

// events
type NewPendingPod struct {
PodId string
NbCores int
Requested_time time.Duration
Deadline time.Time
TimeCritical bool
type PendingPod struct {
PodId string
NbCores int
RequestedTime time.Duration
Deadline time.Time
TimeCritical bool
}

func NewPendingPod() PendingPod {
return PendingPod{
NbCores: 1,
RequestedTime: 900,
}
}

type PodCompleted struct {
PodId string
NbCore int
Completion_time time.Duration
TimeCritical bool
CompletionTime time.Duration
TimeCritical bool
}
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ module github.com/RyaxTech/bebida-shaker

go 1.19

require k8s.io/client-go v0.26.0
require (
github.com/stretchr/testify v1.8.4
k8s.io/client-go v0.26.0
)

require (
github.com/imdario/mergo v0.3.12 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
)

Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0=
github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk=
github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk=
Expand Down

0 comments on commit def592c

Please sign in to comment.