Skip to content

Commit

Permalink
Changed orb package name and reorganized some files.
Browse files Browse the repository at this point in the history
  • Loading branch information
LPetro committed Aug 7, 2024
1 parent 443c070 commit 705595d
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 166 deletions.
6 changes: 3 additions & 3 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
nodepoolcounter "sigs.k8s.io/karpenter/pkg/controllers/nodepool/counter"
nodepoolhash "sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash"
nodepoolvalidation "sigs.k8s.io/karpenter/pkg/controllers/nodepool/validation"
"sigs.k8s.io/karpenter/pkg/controllers/orb/orbbatcher"
"sigs.k8s.io/karpenter/pkg/controllers/orb"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/controllers/state/informer"
Expand All @@ -62,7 +62,7 @@ func NewControllers(
) []controller.Controller {

cluster := state.NewCluster(clock, kubeClient)
SIHeap := orbbatcher.NewSchedulingInputHeap()
SIHeap := orb.NewSchedulingInputHeap()
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, SIHeap)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)
Expand Down Expand Up @@ -92,7 +92,7 @@ func NewControllers(
nodeclaimgarbagecollection.NewController(clock, kubeClient, cloudProvider),
nodeclaimtermination.NewController(kubeClient, cloudProvider, recorder),
nodeclaimdisruption.NewController(clock, kubeClient, cluster, cloudProvider),
orbbatcher.NewController(SIHeap),
orb.NewController(SIHeap),
leasegarbagecollection.NewController(kubeClient),
status.NewController[*v1.NodeClaim](kubeClient, mgr.GetEventRecorderFor("karpenter")),
status.NewController[*v1.NodePool](kubeClient, mgr.GetEventRecorderFor("karpenter")),
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/disruption/orchestration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
"sigs.k8s.io/karpenter/pkg/controllers/orb/orbbatcher"
"sigs.k8s.io/karpenter/pkg/controllers/orb"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/controllers/state/informer"
Expand All @@ -57,7 +57,7 @@ var nodeClaimStateController *informer.NodeClaimController
var fakeClock *clock.FakeClock
var recorder *test.EventRecorder
var queue *orchestration.Queue
var SIHeap *orbbatcher.SchedulingInputHeap
var SIHeap *orb.SchedulingInputHeap
var prov *provisioning.Provisioner

var replacements []string
Expand All @@ -82,7 +82,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
recorder = test.NewEventRecorder()
SIHeap = orbbatcher.NewSchedulingInputHeap()
SIHeap = orb.NewSchedulingInputHeap()
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, SIHeap)
queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/disruption"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
"sigs.k8s.io/karpenter/pkg/controllers/orb/orbbatcher"
"sigs.k8s.io/karpenter/pkg/controllers/orb"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/controllers/state/informer"
Expand All @@ -69,7 +69,7 @@ var nodeStateController *informer.NodeController
var nodeClaimStateController *informer.NodeClaimController
var fakeClock *clock.FakeClock
var recorder *test.EventRecorder
var SIHeap *orbbatcher.SchedulingInputHeap
var SIHeap *orb.SchedulingInputHeap
var queue *orchestration.Queue

var onDemandInstances []*cloudprovider.InstanceType
Expand All @@ -94,7 +94,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
recorder = test.NewEventRecorder()
SIHeap = orbbatcher.NewSchedulingInputHeap()
SIHeap = orb.NewSchedulingInputHeap()
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, SIHeap)
queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package orbbatcher
package orb

import (
"bufio"
Expand All @@ -30,7 +30,6 @@ import (
"time"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"

//"google.golang.org/protobuf/proto"
proto "github.com/gogo/protobuf/proto"
Expand All @@ -45,146 +44,6 @@ import (
"sigs.k8s.io/karpenter/pkg/scheduling"
)

// Timestamp, dynamic inputs (like pending pods, statenodes, etc.)
type SchedulingInput struct {
Timestamp time.Time
PendingPods []*v1.Pod
//all the other scheduling inputs...
}

func (si SchedulingInput) String() string {
return fmt.Sprintf("Timestamp: %v\nPendingPods:\n%v",
si.Timestamp.Format("2006-01-02_15-04-05"),
PodsToString(si.PendingPods))
}

// Function takes a slice of pod pointers and returns a string representation of the pods

// Function take a Scheduling Input to []byte, marshalled as a protobuf
// TODO: With a custom-defined .proto, this will look different.
func (si SchedulingInput) Marshal() ([]byte, error) {
podList := &v1.PodList{
Items: make([]v1.Pod, 0, len(si.PendingPods)),
}

for _, podPtr := range si.PendingPods {
podList.Items = append(podList.Items, *podPtr)
}
return podList.Marshal()

// // Create a slice to store the wire format data
// podDataSlice := make([][]byte, 0, len(si.PendingPods))

// // Iterate over the slice of Pods and marshal each one to its wire format
// for _, pod := range si.PendingPods {
// podData, err := proto.Marshal(pod)
// if err != nil {
// fmt.Println("Error marshaling pod:", err)
// continue
// }
// podDataSlice = append(podDataSlice, podData)
// }

// // Create an ORBLogEntry message
// entry := &ORBLogEntry{
// Timestamp: si.Timestamp.Format("2006-01-02_15-04-05"),
// PendingpodData: podDataSlice,
// }

// return proto.Marshal(entry)
}

// func UnmarshalSchedulingInput(data []byte) (*SchedulingInput, error) {
// // Unmarshal the data into an ORBLogEntry struct
// entry := &ORBLogEntry{}
// if err := proto.Unmarshal(data, entry); err != nil {
// return nil, fmt.Errorf("failed to unmarshal ORBLogEntry: %v", err)
// }

// // Parse the timestamp
// timestamp, err := time.Parse("2006-01-02_15-04-05", entry.Timestamp)
// if err != nil {
// return nil, fmt.Errorf("failed to parse timestamp: %v", err)
// }

// // Unmarshal the PendingpodData into v1.Pod objects
// pendingPods := make([]*v1.Pod, 0, len(entry.PendingpodData))
// for _, podData := range entry.PendingpodData {
// var pod v1.Pod
// if err := proto.Unmarshal(podData, &pod); err != nil {
// return nil, fmt.Errorf("failed to unmarshal pod: %v", err)
// }
// pendingPods = append(pendingPods, &pod)
// }

// // Create a new SchedulingInput struct
// schedulingInput := &SchedulingInput{
// Timestamp: timestamp,
// PendingPods: pendingPods,
// }

// return schedulingInput, nil
// }

// Function to do the reverse, take a scheduling input's []byte and unmarshal it back into a SchedulingInput
func PBToSchedulingInput(timestamp time.Time, data []byte) (SchedulingInput, error) {
podList := &v1.PodList{}
if err := proto.Unmarshal(data, podList); err != nil {
return SchedulingInput{}, fmt.Errorf("unmarshaling pod list, %w", err)
}
pods := lo.ToSlicePtr(podList.Items)
return ReconstructedSchedulingInput(timestamp, pods), nil
}

// This defines a min-heap of SchedulingInputs by slice,
// with the Timestamp field defined as the comparator
type SchedulingInputHeap []SchedulingInput //heaps are thread-safe in container/heap

func (h SchedulingInputHeap) Len() int {
return len(h)
}

func (h SchedulingInputHeap) Less(i, j int) bool {
return h[i].Timestamp.Before(h[j].Timestamp)
}

func (h SchedulingInputHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *SchedulingInputHeap) Push(x interface{}) {
*h = append(*h, x.(SchedulingInput))
}

func (h *SchedulingInputHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}

func NewSchedulingInputHeap() *SchedulingInputHeap {
h := &SchedulingInputHeap{}
heap.Init(h)
return h
}

func NewSchedulingInput(pendingPods []*v1.Pod) SchedulingInput {
return SchedulingInput{
Timestamp: time.Now(),
PendingPods: pendingPods,
}
}

// Reconstruct a scheduling input (presumably from a file)
func ReconstructedSchedulingInput(timestamp time.Time, pendingPods []*v1.Pod) SchedulingInput {
return SchedulingInput{
Timestamp: timestamp,
PendingPods: pendingPods,
}
}

type Controller struct {
SIheap *SchedulingInputHeap // Batches logs in a Queue
mostRecentFilename string // The most recently saved filename (for checking for changes)
Expand Down Expand Up @@ -315,12 +174,6 @@ func OfferingToString(offering *cloudprovider.Offering) string {
RequirementsToString(&offering.Requirements), offering.Price, offering.Available)
}

// Function for logging everything in the Provisioner Scheduler (i.e. pending pods, statenodes...)
func (q *SchedulingInputHeap) LogProvisioningScheduler(pods []*v1.Pod, stateNodes []*state.StateNode, instanceTypes map[string][]*cloudprovider.InstanceType) {
si := NewSchedulingInput(pods) // TODO: add all inputs I want to log
q.Push(si) // sends that scheduling input into the data structure to be popped in batch to go to PV as a protobuf
}

/* This function saves things to our Persistent Volume */
// Saves data to PV (S3 Bucket for AWS) via the mounted log path
// It takes a name of the log file as well as the logline to be logged.
Expand Down
65 changes: 65 additions & 0 deletions pkg/controllers/orb/heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package orb

import (
"container/heap"

v1 "k8s.io/api/core/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/state"
)

// This defines a min-heap of SchedulingInputs by slice,
// with the Timestamp field defined as the comparator
type SchedulingInputHeap []SchedulingInput //heaps are thread-safe in container/heap

func (h SchedulingInputHeap) Len() int {
return len(h)
}

func (h SchedulingInputHeap) Less(i, j int) bool {
return h[i].Timestamp.Before(h[j].Timestamp)
}

func (h SchedulingInputHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *SchedulingInputHeap) Push(x interface{}) {
*h = append(*h, x.(SchedulingInput))
}

func (h *SchedulingInputHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}

func NewSchedulingInputHeap() *SchedulingInputHeap {
h := &SchedulingInputHeap{}
heap.Init(h)
return h
}

// Function for logging everything in the Provisioner Scheduler (i.e. pending pods, statenodes...)
func (h *SchedulingInputHeap) LogProvisioningScheduler(pods []*v1.Pod, stateNodes []*state.StateNode, instanceTypes map[string][]*cloudprovider.InstanceType) {
si := NewSchedulingInput(pods) // TODO: add all inputs I want to log
h.Push(si) // sends that scheduling input into the data structure to be popped in batch to go to PV as a protobuf
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 705595d

Please sign in to comment.