Skip to content

Commit

Permalink
RHIROS-1326 Kruize 0.0.20.1_rm Integration (#129)
Browse files Browse the repository at this point in the history
* updates to is_valid_recommendation func
* validate recommendation/container on poller
---------

Co-authored-by: Sagnik Dutta <sdutta@redhat.com>
  • Loading branch information
saltgen and saltgen authored Jan 30, 2024
1 parent 363a37c commit 5cf4a0e
Show file tree
Hide file tree
Showing 13 changed files with 918 additions and 304 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ coverage.txt
output.csv
.idea/
.DS_Store
.go/
124 changes: 88 additions & 36 deletions internal/api/utils.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package api

import (
"encoding/json"
"fmt"
"math"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"encoding/json"
"math"

"gorm.io/datatypes"

Expand Down Expand Up @@ -223,22 +223,17 @@ func TransformComponentUnits(jsonData datatypes.JSON) map[string]interface{} {
return nil
}

durationBased, ok := data["duration_based"].(map[string]interface{})
if !ok {
fmt.Printf("duration_based not found in JSON")
}

convertMemory := func(memory map[string]interface{}) error {
amount, ok := memory["amount"].(float64)
if ok {
memoryInMiB := amount / 1024 / 1024
if math.Abs(memoryInMiB) >= 1024 {
memoryInGiB := memoryInMiB / 1024
memory["amount"] = math.Trunc(memoryInGiB*100) / 100
memory["format"] = "GiB"
memory["format"] = "Gi"
} else {
memory["amount"] = math.Trunc(memoryInMiB*100) / 100
memory["format"] = "MiB"
memory["format"] = "Mi"
}
}
return nil
Expand All @@ -265,56 +260,113 @@ func TransformComponentUnits(jsonData datatypes.JSON) map[string]interface{} {
if math.Abs(cpuInCores) < 1 {
cpuInMillicores := cpuInCores * 1000
cpu["amount"] = math.Round(cpuInMillicores) // millicore values are rounded & don't require decimal precision
cpu["format"] = "millicores"
cpu["format"] = "m"
} else {
cpu["amount"] = truncateToThreeDecimalPlaces(cpuInCores)
cpu["format"] = "cores"
cpu["format"] = nil
}
}
return nil
}

// Current section of recommendation
current_config, ok := data["current"].(map[string]interface{})
if !ok {
log.Error("current not found in JSON")
}

for _, section := range []string{"limits", "requests"} {

sectionObject, ok := current_config[section].(map[string]interface{})
if ok {
memory, ok := sectionObject["memory"].(map[string]interface{})
if ok {
err := convertMemory(memory)
if err != nil {
fmt.Printf("error converting memory in %s: %v\n", sectionObject, err)
continue
}
}
cpu, ok := sectionObject["cpu"].(map[string]interface{})
if ok {
err := convertCPU(cpu)
if err != nil {
fmt.Printf("error converting cpu in %s: %v\n", sectionObject, err)
continue
}
}
}
}

/*
Recommendation data is available for three periods
under cost and performance keys(engines)
For each of these actual values will be present in
below mentioned dataBlocks > request and limits
*/

for _, period := range []string{"long_term", "medium_term", "short_term"} {
intervalData, ok := durationBased[period].(map[string]interface{})
// Recommendation section
recommendation_terms, ok := data["recommendation_terms"].(map[string]interface{})
if !ok {
log.Error("recommendation data not found in JSON")
return data
}

for _, period := range []string{"short_term", "medium_term", "long_term"} {
intervalData, ok := recommendation_terms[period].(map[string]interface{})
if !ok {
continue
}

for _, dataBlock := range []string{"current", "config", "variation"} {
recommendationSection, ok := intervalData[dataBlock].(map[string]interface{})
if !ok {
continue
}
/* Hack
// monitoring_start_time is currently not nullable on DB
// Hence cannot be set to null while saving response from Kruize
*/
// remove nil equivalent monitoring_start_time in API response
monitoring_start_time := intervalData["monitoring_start_time"]
if monitoring_start_time == "0001-01-01T00:00:00Z" {
delete(intervalData, "monitoring_start_time")
}

for _, section := range []string{"limits", "requests"} {
if intervalData["recommendation_engines"] != nil {

sectionObject, ok := recommendationSection[section].(map[string]interface{})
if ok {
memory, ok := sectionObject["memory"].(map[string]interface{})
if ok {
err := convertMemory(memory)
if err != nil {
fmt.Printf("error converting memory in %s: %v\n", period, err)
continue
}
for _, recommendationType := range []string{"cost", "performance"} {
engineData, ok := intervalData["recommendation_engines"].(map[string]interface{})[recommendationType].(map[string]interface{})
if !ok {
continue
}

for _, dataBlock := range []string{"config", "variation"} {
recommendationSection, ok := engineData[dataBlock].(map[string]interface{})
if !ok {
continue
}
cpu, ok := sectionObject["cpu"].(map[string]interface{})
if ok {
err := convertCPU(cpu)
if err != nil {
fmt.Printf("error converting cpu in %s: %v\n", period, err)
continue

for _, section := range []string{"limits", "requests"} {

sectionObject, ok := recommendationSection[section].(map[string]interface{})
if ok {
memory, ok := sectionObject["memory"].(map[string]interface{})
if ok {
err := convertMemory(memory)
if err != nil {
fmt.Printf("error converting memory in %s: %v\n", period, err)
continue
}
}
cpu, ok := sectionObject["cpu"].(map[string]interface{})
if ok {
err := convertCPU(cpu)
if err != nil {
fmt.Printf("error converting cpu in %s: %v\n", period, err)
continue
}
}
}
}

}
}

}
}
}

Expand Down
4 changes: 0 additions & 4 deletions internal/services/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ import (
)

var (
invalidRecommendation = promauto.NewCounter(prometheus.CounterOpts{
Name: "rosocp_invalid_recommendation_total",
Help: "The total number of invalid recommendation send by Kruize",
})
invalidCSV = promauto.NewCounter(prometheus.CounterOpts{
Name: "rosocp_invalid_csv_total",
Help: "The total number of invalid csv send by cost-mgmt",
Expand Down
34 changes: 18 additions & 16 deletions internal/services/recommendation_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ func requestAndSaveRecommendation(kafkaMsg types.RecommendationKafkaMsg, recomme
return poll_cycle_complete
}

// TODO: Is_valid_recommendation to be called on every container record v20.1 upgrade on wards
if kruize.Is_valid_recommendation(recommendation) {
containers := recommendation[0].Kubernetes_objects[0].Containers
recommendationSetList := []model.RecommendationSet{}
histRecommendationSetList := []model.HistoricalRecommendationSet{}
containers := recommendation[0].Kubernetes_objects[0].Containers
recommendationSetList := []model.RecommendationSet{}
histRecommendationSetList := []model.HistoricalRecommendationSet{}

for _, container := range containers {
for _, container := range containers {
if kruize.Is_valid_recommendation(container.Recommendations, experiment_name, maxEndTimeFromReport) {
for _, v := range container.Recommendations.Data {
marshalData, err := json.Marshal(v)
if err != nil {
Expand All @@ -89,8 +88,8 @@ func requestAndSaveRecommendation(kafkaMsg types.RecommendationKafkaMsg, recomme
recommendationSet := model.RecommendationSet{
WorkloadID: kafkaMsg.Metadata.Workload_id,
ContainerName: container.Container_name,
MonitoringStartTime: v.Duration_based.Short_term.Monitoring_start_time,
MonitoringEndTime: v.Duration_based.Short_term.Monitoring_end_time,
MonitoringStartTime: v.RecommendationTerms.Short_term.MonitoringStartTime,
MonitoringEndTime: v.MonitoringEndTime,
Recommendations: marshalData,
}
recommendationSetList = append(recommendationSetList, recommendationSet)
Expand All @@ -100,20 +99,24 @@ func requestAndSaveRecommendation(kafkaMsg types.RecommendationKafkaMsg, recomme
OrgId: kafkaMsg.Metadata.Org_id,
WorkloadID: kafkaMsg.Metadata.Workload_id,
ContainerName: container.Container_name,
MonitoringStartTime: v.Duration_based.Short_term.Monitoring_start_time,
MonitoringEndTime: v.Duration_based.Short_term.Monitoring_end_time,
MonitoringStartTime: v.RecommendationTerms.Short_term.MonitoringStartTime,
MonitoringEndTime: v.MonitoringEndTime,
Recommendations: marshalData,
}
histRecommendationSetList = append(histRecommendationSetList, historicalRecommendationSet)
}
} else {
poll_cycle_complete = true
continue
}
}
if len(recommendationSetList) > 0 {
txError := transactionForRecommendation(recommendationSetList, histRecommendationSetList, experiment_name, recommendationType)
if txError == nil {
poll_cycle_complete = true
} else {
poll_cycle_complete = false
}
} else {
poll_cycle_complete = true
invalidRecommendation.Inc()
}
return poll_cycle_complete
}
Expand Down Expand Up @@ -157,10 +160,9 @@ func PollForRecommendations(msg *kafka.Message, consumer_object *kafka.Consumer)
commitKafkaMsg(msg, consumer_object)
}
// To consume upcoming Kafka msg, explicitly
// Especially in case of un-committed msgs
return
return
case true:
// MonitoringEndTime.UTC() defaults to 0001-01-01 00:00:00 +0000 UTC if not found
// MonitoringEndTime.UTC() defaults to 0001-01-01 00:00:00 +0000 UTC if not set
if !recommendation_stored_in_db.MonitoringEndTime.UTC().IsZero() {
duration := maxEndTimeFromReport.Sub(recommendation_stored_in_db.MonitoringEndTime.UTC())
if int(duration.Hours()) >= cfg.RecommendationPollIntervalHours {
Expand Down
3 changes: 2 additions & 1 deletion internal/services/report_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func ProcessReport(msg *kafka.Message, _ *kafka.Consumer) {
k8s_object_name,
)

container_names, err := kruize.Create_kruize_experiments(experiment_name, k8s_object)
cluster_identifier := kafkaMsg.Metadata.Org_id + ";" + kafkaMsg.Metadata.Cluster_uuid
container_names, err := kruize.Create_kruize_experiments(experiment_name, cluster_identifier, k8s_object)
if err != nil {
log.Error(err)
continue
Expand Down
52 changes: 31 additions & 21 deletions internal/types/kruizePayload/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type container struct {
Container_image_name string `json:"container_image_name,omitempty"`
Container_name string `json:"container_name,omitempty"`
Metrics []metric `json:"metrics,omitempty"`
Recommendations recommendation `json:"recommendations,omitempty"`
Recommendations Recommendation `json:"recommendations,omitempty"`
}

type metric struct {
Expand All @@ -37,37 +37,47 @@ type aggregation_info struct {
Format string `json:"format,omitempty"`
}

type recommendation struct {
Data map[string]recommendationType `json:"data,omitempty"`
Notifications map[string]notification `json:"notifications,omitempty"`
type Recommendation struct {
Version string `json:"version,omitempty"`
Data map[string]RecommendationData `json:"data,omitempty"`
Notifications map[string]Notification `json:"notifications,omitempty"`
}

type notification struct {
type Notification struct {
NotifyType string `json:"type,omitempty"`
Message string `json:"message,omitempty"`
Code int `json:"code,omitempty"`
}

type recommendationType struct {
Duration_based termbased `json:"duration_based,omitempty"`
type RecommendationEngineObject struct {
PodsCount int `json:"pods_count,omitempty"`
ConfidenceLevel float64 `json:"confidence_level,omitempty"`
Config ConfigObject `json:"config,omitempty"`
Variation ConfigObject `json:"variation,omitempty"`
Notifications map[string]Notification `json:"notifications,omitempty"`
}

type termbased struct {
Short_term recommendationObject `json:"short_term,omitempty"`
Medium_term recommendationObject `json:"medium_term,omitempty"`
Long_term recommendationObject `json:"long_term,omitempty"`
type RecommendationData struct {
Notifications map[string]Notification `json:"notifications,omitempty"`
MonitoringEndTime time.Time `json:"monitoring_end_time,omitempty"`
Current ConfigObject `json:"current,omitempty"`
RecommendationTerms Term `json:"recommendation_terms,omitempty"`
}

type recommendationObject struct {
Monitoring_start_time time.Time `json:"monitoring_start_time,omitempty"`
Monitoring_end_time time.Time `json:"monitoring_end_time,omitempty"`
Duration_in_hours float64 `json:"duration_in_hours,omitempty"`
Pods_count int `json:"pods_count,omitempty"`
Confidence_level float64 `json:"confidence_level,omitempty"`
Current ConfigObject `json:"current,omitempty"`
Config ConfigObject `json:"config,omitempty"`
Variation ConfigObject `json:"variation,omitempty"`
Notifications map[string]notification `json:"notifications,omitempty"`
type RecommendationTerm struct {
DurationInHours float64 `json:"duration_in_hours,omitempty"`
Notifications map[string]Notification `json:"notifications,omitempty"`
MonitoringStartTime time.Time `json:"monitoring_start_time,omitempty"`
RecommendationEngines *struct {
Cost RecommendationEngineObject `json:"cost,omitempty"`
Performance RecommendationEngineObject `json:"performance,omitempty"`
} `json:"recommendation_engines,omitempty"`
}

type Term struct {
Short_term RecommendationTerm `json:"short_term"`
Medium_term RecommendationTerm `json:"medium_term"`
Long_term RecommendationTerm `json:"long_term,omitempty"`
}

type ConfigObject struct {
Expand Down
4 changes: 3 additions & 1 deletion internal/types/kruizePayload/createExperiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
type createExperiment struct {
Version string `json:"version"`
Experiment_name string `json:"experiment_name"`
Cluster_name string `json:"cluster_name"`
Performance_profile string `json:"performance_profile"`
Mode string `json:"mode"`
Target_cluster string `json:"target_cluster"`
Expand All @@ -23,7 +24,7 @@ type recommendation_settings struct {
Threshold string `json:"threshold"`
}

func GetCreateExperimentPayload(experiment_name string, containers []map[string]string, data map[string]string) ([]byte, error) {
func GetCreateExperimentPayload(experiment_name string, cluster_identifier string, containers []map[string]string, data map[string]string) ([]byte, error) {
container_array := []container{}
for _, c := range containers {
container_array = append(container_array, container{
Expand All @@ -35,6 +36,7 @@ func GetCreateExperimentPayload(experiment_name string, containers []map[string]
{
Version: "1.0",
Experiment_name: experiment_name,
Cluster_name: cluster_identifier,
Performance_profile: "resource-optimization-openshift",
Mode: "monitor",
Target_cluster: "remote",
Expand Down
Loading

0 comments on commit 5cf4a0e

Please sign in to comment.