Skip to content

Commit

Permalink
Add event base main loop + refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mickours committed Feb 21, 2024
1 parent f5ccbf0 commit 8580940
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 189 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@ This repository provides the BeBiDa optimizations to improve Big Data jobs turna
We can use two mechanisms to improve BeBiDa guarantees: 1) deadline-aware and 2) time-critical. These two approaches are complementary will be combined.

## Deadline-aware
In this technique we create empty jobs which do not trigger the prolog/epilog to leave room for applications. Hence we prepare holes on the HPC schedule plan to guarantee a fixed pool of resources for the Big Data workload. The main issue is when to trigger these jobs and with how many resources and time.

In this technique we create empty jobs which do not trigger the prolog/epilog to leave room for applications. Hence we prepare holes on the HPC schedule plan to guarantee a fixed pool of resources for the Big Data workload. The main issue is when to trigger these jobs and with how many resources and time.

### Technical details

We gather information on job duration and resource needs from Kubernetes annotations that are set on the application Pod. See the configuration section below for more details.


## Time-critical

In this technique we will use a dynamic set of resources to serve applications immediately and scale them out and in (grow and shrink) when necessary. Again, the main issue is when to add or remove nodes from the on-demand pool. For this we will make use of advanced reservations.

The following figure sketches the design of executing jobs using the new BeBiDa deadline-aware and time-critical techniques through the usage of RYAX workflow engine.
Expand All @@ -22,6 +29,8 @@ The following figure sketches the design of executing jobs using the new BeBiDa
<figcaption>High-level view of the deadline-aware and time-critical BeBiDa mechanisms.</figcaption>
</figure>



## Usage

## Roadmap
Expand All @@ -34,6 +43,7 @@ The following figure sketches the design of executing jobs using the new BeBiDa
- [X] Handle BDA app early termination (cancel HPC job if not used anymore)
- [X] Full testing environment with SLURM and OAR [nixos-compose compositions](https://github.com/oar-team/regale-nixos-compose/tree/main/bebida)
- [X] Support for OAR over SSH (HPC)
- [ ] Add deadline support using Kubernetes annotations
- [ ] Implement the TimeCritical app support with dynamic partitioning
- [ ] OAR Quotas
- [ ] Slurm partition Limits
Expand Down
79 changes: 26 additions & 53 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package main
import (
"os"
"strconv"
"time"

connectors "github.com/RyaxTech/bebida-shaker/connectors"
"github.com/RyaxTech/bebida-shaker/events"
"github.com/apex/log"
)

Expand All @@ -21,18 +21,28 @@ type Parameters struct {

var params Parameters

// Simulate a function that takes 1s to complete.
func run() string {
log.Info("Check for the Queue state")
queueSize, timeCriticalQueueSize, deadlineAwareQueue, err := connectors.GetQueueSize()
if err != nil {
log.Errorf("Unable to get size the queue %s", err)
}
nbRunningApp, err := connectors.GetNbRunningApp()
if err != nil {
log.Errorf("Unable to get number of running app %s", err)
func schedule(event interface{}, hpcScheduler connectors.HPCConnector) {
switch event.(type) {
case events.NewPendingPod:
log.Infof("Handling new pending pod: %v+\n", event)
pod := event.(events.NewPendingPod)
_, err := hpcScheduler.Punch(int(pod.NbCores), int(pod.Requested_time.Seconds()))
if err != nil {
log.Errorf("Unable to allocate resources %s", err)
}
case events.PodCompleted:
log.Infof("Handling pod completed: %v+\n", event)

default:
log.Fatalf("Unknown event %v+\n", event)
panic(-1)
}

}

func run() {
event_channel := make(chan interface{})

var HPCScheduler connectors.HPCConnector
switch params.HPCSchedulerType {
case "OAR":
Expand All @@ -41,47 +51,10 @@ func run() string {
HPCScheduler = connectors.SLURM{}
}

if timeCriticalQueueSize > 0 {
HPCScheduler.Refill(timeCriticalQueueSize)
} else {
HPCScheduler.Refill(-1)
}

for _, job := range deadlineAwareQueue {
log.Debugf("Pending Deadline aware job %+v\n", job)
jobID, err := HPCScheduler.Punch(int(job.NbCPU), job.Duration_in_seconds)
if err != nil {
log.Errorf("Unable to allocate resources %s", err)
}
// FIXME: might return multiple job id...
return jobID
}

log.Infof("Queue size found: %d", queueSize)
log.Infof("Nb running app found: %d", nbRunningApp)
if queueSize > params.threshold && params.pendingJobs < params.maxPendingJob {
log.Info("Hummmm... a Ti'Punch ^^")
params.pendingJobs += 1
jobID, err := HPCScheduler.Punch(1, 900)
if err != nil {
log.Errorf("Unable to allocate resources %s", err)
}
params.pendingJobs -= 1
return jobID
} else if queueSize == 0 && nbRunningApp == 0 {
HPCScheduler.QuitAllPunch()
}
return ""
}

func RunForever(step time.Duration) {
punchJobIds := []string{}
go connectors.WatchQueues(event_channel)
for {
punchJobId := run()
if punchJobId != "" {
punchJobIds = append(punchJobIds, punchJobId)
}
time.Sleep(step * time.Second)
event := <-event_channel
schedule(event, HPCScheduler)
}
}

Expand Down Expand Up @@ -109,7 +82,7 @@ func getStrEnv(envName string, defaultValue string) string {
}

func main() {
log.Info("Starting Bebida Shacker")
log.Info("Starting Bebida Shaker")
params = Parameters{
threshold: getIntEnv("BEBIDA_NB_PENDING_JOB_THRESHOLD", 1),
pendingJobs: 0,
Expand All @@ -118,5 +91,5 @@ func main() {
stepTimeInSeconds: getIntEnv("BEBIDA_STEP_IN_SECONDS", 3),
}
log.Infof("Parameters: %+v\n", params)
RunForever(time.Duration(params.stepTimeInSeconds))
run()
}
6 changes: 2 additions & 4 deletions ssh_test.go → connectors/exec/ssh_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package main
package exec

import (
"testing"

"github.com/RyaxTech/bebida-shaker/connectors/exec"
)

func TestSSH(t *testing.T) {
out, err := exec.ExecuteCommand("echo toto")
out, err := ExecuteCommand("echo toto")
if err != nil {
t.Error(err)
}
Expand Down
42 changes: 42 additions & 0 deletions connectors/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"os"
"time"

"github.com/RyaxTech/bebida-shaker/events"
"github.com/apex/log"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
Expand All @@ -25,6 +27,46 @@ type DeadlineAwareJob struct {
Duration_in_seconds int
}

func WatchQueues(channel chan interface{}) {
k8sConfig := K8sConfig{namespace: "default", labelSelector: "", kubeconfigPath: os.Getenv("KUBECONFIG")}

config, err := clientcmd.BuildConfigFromFlags("", k8sConfig.kubeconfigPath)
if err != nil {
log.Errorf("Error while getting Kubernetes configuration %s", err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Errorf("Error while creating Kubernetes client %s", err)
}

ctx := context.Background()
watcher, err := client.CoreV1().Pods(v1.NamespaceDefault).Watch(ctx, metav1.ListOptions{})
if err != nil {
panic(err)
}

for event := range watcher.ResultChan() {
pod := event.Object.(*v1.Pod)

switch event.Type {
case watch.Added:
log.Infof("Pod %s/%s added", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
nbCpu, _ := pod.Spec.Containers[0].Resources.Requests.Cpu().AsInt64()
deadline, err := time.Parse(pod.Labels["deadline"], time.RFC3339)
if err != nil {
log.Warnf("Error %s while retrieving CPU request for Pod %v+\n", err, pod)
}
requestedTime, err := time.ParseDuration(pod.Labels["duration"])
timeCritical := (pod.Labels["timeCritical"] != "")
channel <- events.NewPendingPod{PodId: pod.Name, NbCores: int(nbCpu), Requested_time: requestedTime, Deadline: deadline, TimeCritical: timeCritical}
case watch.Modified:
log.Infof("Pod %s/%s modified", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
case watch.Deleted:
log.Infof("Pod %s/%s deleted", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
}
}
}

func GetQueueSize() (int, int, []DeadlineAwareJob, error) {
k8sConfig := K8sConfig{namespace: "default", labelSelector: "", kubeconfigPath: os.Getenv("KUBECONFIG")}
namespace := k8sConfig.namespace
Expand Down
File renamed without changes.
18 changes: 18 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package events

import "time"

// events
type NewPendingPod struct {
PodId string
NbCores int
Requested_time time.Duration
Deadline time.Time
TimeCritical bool
}
type PodCompleted struct {
PodId string
NbCore int
Completion_time time.Duration
TimeCritical bool
}
12 changes: 6 additions & 6 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 2 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,9 @@ go 1.19
require k8s.io/client-go v0.26.0

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
k8s.io/apiextensions-apiserver v0.25.0 // indirect
k8s.io/component-base v0.25.0 // indirect
)

require (
Expand Down Expand Up @@ -55,12 +41,11 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.26.0 // indirect
k8s.io/apimachinery v0.26.0 // indirect
k8s.io/api v0.26.0
k8s.io/apimachinery v0.26.0
k8s.io/klog/v2 v2.80.1 // indirect
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect
k8s.io/utils v0.0.0-20221107191617-1a15be271d1d // indirect
sigs.k8s.io/controller-runtime v0.13.1
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
Expand Down
Loading

0 comments on commit 8580940

Please sign in to comment.