Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RHIROS-1326 Kruize 0.0.20.1_rm Integration #129

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ coverage.txt
output.csv
.idea/
.DS_Store
.go/
124 changes: 88 additions & 36 deletions internal/api/utils.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package api

import (
"encoding/json"
"fmt"
"math"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"encoding/json"
"math"

"gorm.io/datatypes"

Expand Down Expand Up @@ -223,22 +223,17 @@ func TransformComponentUnits(jsonData datatypes.JSON) map[string]interface{} {
return nil
}

durationBased, ok := data["duration_based"].(map[string]interface{})
if !ok {
fmt.Printf("duration_based not found in JSON")
}

convertMemory := func(memory map[string]interface{}) error {
upadhyeammit marked this conversation as resolved.
Show resolved Hide resolved
amount, ok := memory["amount"].(float64)
if ok {
memoryInMiB := amount / 1024 / 1024
if math.Abs(memoryInMiB) >= 1024 {
memoryInGiB := memoryInMiB / 1024
memory["amount"] = math.Trunc(memoryInGiB*100) / 100
memory["format"] = "GiB"
memory["format"] = "Gi"
} else {
memory["amount"] = math.Trunc(memoryInMiB*100) / 100
memory["format"] = "MiB"
memory["format"] = "Mi"
}
}
return nil
Expand All @@ -265,56 +260,113 @@ func TransformComponentUnits(jsonData datatypes.JSON) map[string]interface{} {
if math.Abs(cpuInCores) < 1 {
cpuInMillicores := cpuInCores * 1000
cpu["amount"] = math.Round(cpuInMillicores) // millicore values are rounded & don't require decimal precision
cpu["format"] = "millicores"
cpu["format"] = "m"
} else {
cpu["amount"] = truncateToThreeDecimalPlaces(cpuInCores)
cpu["format"] = "cores"
cpu["format"] = nil
}
}
return nil
}

// Current section of recommendation
current_config, ok := data["current"].(map[string]interface{})
if !ok {
log.Error("current not found in JSON")
}

for _, section := range []string{"limits", "requests"} {

sectionObject, ok := current_config[section].(map[string]interface{})
if ok {
memory, ok := sectionObject["memory"].(map[string]interface{})
if ok {
err := convertMemory(memory)
if err != nil {
fmt.Printf("error converting memory in %s: %v\n", sectionObject, err)
continue
}
}
cpu, ok := sectionObject["cpu"].(map[string]interface{})
if ok {
err := convertCPU(cpu)
if err != nil {
fmt.Printf("error converting cpu in %s: %v\n", sectionObject, err)
continue
}
}
upadhyeammit marked this conversation as resolved.
Show resolved Hide resolved
}
}

/*
Recommendation data is available for three periods
under cost and performance keys(engines)
For each of these actual values will be present in
below mentioned dataBlocks > request and limits
*/

for _, period := range []string{"long_term", "medium_term", "short_term"} {
intervalData, ok := durationBased[period].(map[string]interface{})
// Recommendation section
recommendation_terms, ok := data["recommendation_terms"].(map[string]interface{})
if !ok {
upadhyeammit marked this conversation as resolved.
Show resolved Hide resolved
log.Error("recommendation data not found in JSON")
return data
}

for _, period := range []string{"short_term", "medium_term", "long_term"} {
intervalData, ok := recommendation_terms[period].(map[string]interface{})
if !ok {
continue
}

for _, dataBlock := range []string{"current", "config", "variation"} {
recommendationSection, ok := intervalData[dataBlock].(map[string]interface{})
if !ok {
continue
}
/* Hack
// monitoring_start_time is currently not nullable on DB
// Hence cannot be set to null while saving response from Kruize
*/
// remove nil equivalent monitoring_start_time in API response
monitoring_start_time := intervalData["monitoring_start_time"]
if monitoring_start_time == "0001-01-01T00:00:00Z" {
delete(intervalData, "monitoring_start_time")
}

for _, section := range []string{"limits", "requests"} {
if intervalData["recommendation_engines"] != nil {

sectionObject, ok := recommendationSection[section].(map[string]interface{})
if ok {
memory, ok := sectionObject["memory"].(map[string]interface{})
if ok {
err := convertMemory(memory)
if err != nil {
fmt.Printf("error converting memory in %s: %v\n", period, err)
continue
}
for _, recommendationType := range []string{"cost", "performance"} {
engineData, ok := intervalData["recommendation_engines"].(map[string]interface{})[recommendationType].(map[string]interface{})
if !ok {
continue
}

for _, dataBlock := range []string{"config", "variation"} {
recommendationSection, ok := engineData[dataBlock].(map[string]interface{})
if !ok {
continue
}
cpu, ok := sectionObject["cpu"].(map[string]interface{})
if ok {
err := convertCPU(cpu)
if err != nil {
fmt.Printf("error converting cpu in %s: %v\n", period, err)
continue

for _, section := range []string{"limits", "requests"} {

sectionObject, ok := recommendationSection[section].(map[string]interface{})
if ok {
memory, ok := sectionObject["memory"].(map[string]interface{})
if ok {
err := convertMemory(memory)
if err != nil {
fmt.Printf("error converting memory in %s: %v\n", period, err)
continue
}
}
cpu, ok := sectionObject["cpu"].(map[string]interface{})
if ok {
err := convertCPU(cpu)
if err != nil {
fmt.Printf("error converting cpu in %s: %v\n", period, err)
continue
}
}
}
}

}
}

}
}
}

Expand Down
4 changes: 0 additions & 4 deletions internal/services/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ import (
)

var (
invalidRecommendation = promauto.NewCounter(prometheus.CounterOpts{
Name: "rosocp_invalid_recommendation_total",
Help: "The total number of invalid recommendation send by Kruize",
})
invalidCSV = promauto.NewCounter(prometheus.CounterOpts{
Name: "rosocp_invalid_csv_total",
Help: "The total number of invalid csv send by cost-mgmt",
Expand Down
34 changes: 18 additions & 16 deletions internal/services/recommendation_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ func requestAndSaveRecommendation(kafkaMsg types.RecommendationKafkaMsg, recomme
return poll_cycle_complete
}

// TODO: Is_valid_recommendation to be called on every container record v20.1 upgrade on wards
if kruize.Is_valid_recommendation(recommendation) {
containers := recommendation[0].Kubernetes_objects[0].Containers
recommendationSetList := []model.RecommendationSet{}
histRecommendationSetList := []model.HistoricalRecommendationSet{}
containers := recommendation[0].Kubernetes_objects[0].Containers
recommendationSetList := []model.RecommendationSet{}
histRecommendationSetList := []model.HistoricalRecommendationSet{}

for _, container := range containers {
for _, container := range containers {
if kruize.Is_valid_recommendation(container.Recommendations, experiment_name, maxEndTimeFromReport) {
for _, v := range container.Recommendations.Data {
marshalData, err := json.Marshal(v)
if err != nil {
Expand All @@ -89,8 +88,8 @@ func requestAndSaveRecommendation(kafkaMsg types.RecommendationKafkaMsg, recomme
recommendationSet := model.RecommendationSet{
WorkloadID: kafkaMsg.Metadata.Workload_id,
ContainerName: container.Container_name,
MonitoringStartTime: v.Duration_based.Short_term.Monitoring_start_time,
MonitoringEndTime: v.Duration_based.Short_term.Monitoring_end_time,
MonitoringStartTime: v.RecommendationTerms.Short_term.MonitoringStartTime,
MonitoringEndTime: v.MonitoringEndTime,
Recommendations: marshalData,
}
recommendationSetList = append(recommendationSetList, recommendationSet)
Expand All @@ -100,20 +99,24 @@ func requestAndSaveRecommendation(kafkaMsg types.RecommendationKafkaMsg, recomme
OrgId: kafkaMsg.Metadata.Org_id,
WorkloadID: kafkaMsg.Metadata.Workload_id,
ContainerName: container.Container_name,
MonitoringStartTime: v.Duration_based.Short_term.Monitoring_start_time,
MonitoringEndTime: v.Duration_based.Short_term.Monitoring_end_time,
MonitoringStartTime: v.RecommendationTerms.Short_term.MonitoringStartTime,
MonitoringEndTime: v.MonitoringEndTime,
Recommendations: marshalData,
}
histRecommendationSetList = append(histRecommendationSetList, historicalRecommendationSet)
}
} else {
poll_cycle_complete = true
continue
}
}
if len(recommendationSetList) > 0 {
txError := transactionForRecommendation(recommendationSetList, histRecommendationSetList, experiment_name, recommendationType)
if txError == nil {
poll_cycle_complete = true
} else {
poll_cycle_complete = false
}
} else {
poll_cycle_complete = true
invalidRecommendation.Inc()
}
return poll_cycle_complete
}
Expand Down Expand Up @@ -157,10 +160,9 @@ func PollForRecommendations(msg *kafka.Message, consumer_object *kafka.Consumer)
commitKafkaMsg(msg, consumer_object)
}
// To consume upcoming Kafka msg, explicitly
// Especially in case of un-committed msgs
return
return
case true:
// MonitoringEndTime.UTC() defaults to 0001-01-01 00:00:00 +0000 UTC if not found
// MonitoringEndTime.UTC() defaults to 0001-01-01 00:00:00 +0000 UTC if not set
if !recommendation_stored_in_db.MonitoringEndTime.UTC().IsZero() {
duration := maxEndTimeFromReport.Sub(recommendation_stored_in_db.MonitoringEndTime.UTC())
if int(duration.Hours()) >= cfg.RecommendationPollIntervalHours {
Expand Down
3 changes: 2 additions & 1 deletion internal/services/report_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func ProcessReport(msg *kafka.Message, _ *kafka.Consumer) {
k8s_object_name,
)

container_names, err := kruize.Create_kruize_experiments(experiment_name, k8s_object)
cluster_identifier := kafkaMsg.Metadata.Org_id + ";" + kafkaMsg.Metadata.Cluster_uuid
container_names, err := kruize.Create_kruize_experiments(experiment_name, cluster_identifier, k8s_object)
if err != nil {
log.Error(err)
continue
Expand Down
52 changes: 31 additions & 21 deletions internal/types/kruizePayload/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type container struct {
Container_image_name string `json:"container_image_name,omitempty"`
Container_name string `json:"container_name,omitempty"`
Metrics []metric `json:"metrics,omitempty"`
Recommendations recommendation `json:"recommendations,omitempty"`
Recommendations Recommendation `json:"recommendations,omitempty"`
}

type metric struct {
Expand All @@ -37,37 +37,47 @@ type aggregation_info struct {
Format string `json:"format,omitempty"`
}

type recommendation struct {
Data map[string]recommendationType `json:"data,omitempty"`
Notifications map[string]notification `json:"notifications,omitempty"`
type Recommendation struct {
Version string `json:"version,omitempty"`
Data map[string]RecommendationData `json:"data,omitempty"`
Notifications map[string]Notification `json:"notifications,omitempty"`
}

type notification struct {
type Notification struct {
NotifyType string `json:"type,omitempty"`
Message string `json:"message,omitempty"`
Code int `json:"code,omitempty"`
}

type recommendationType struct {
Duration_based termbased `json:"duration_based,omitempty"`
type RecommendationEngineObject struct {
upadhyeammit marked this conversation as resolved.
Show resolved Hide resolved
PodsCount int `json:"pods_count,omitempty"`
ConfidenceLevel float64 `json:"confidence_level,omitempty"`
Config ConfigObject `json:"config,omitempty"`
Variation ConfigObject `json:"variation,omitempty"`
Notifications map[string]Notification `json:"notifications,omitempty"`
}

type termbased struct {
Short_term recommendationObject `json:"short_term,omitempty"`
Medium_term recommendationObject `json:"medium_term,omitempty"`
Long_term recommendationObject `json:"long_term,omitempty"`
type RecommendationData struct {
Notifications map[string]Notification `json:"notifications,omitempty"`
MonitoringEndTime time.Time `json:"monitoring_end_time,omitempty"`
Current ConfigObject `json:"current,omitempty"`
RecommendationTerms Term `json:"recommendation_terms,omitempty"`
}

type recommendationObject struct {
Monitoring_start_time time.Time `json:"monitoring_start_time,omitempty"`
Monitoring_end_time time.Time `json:"monitoring_end_time,omitempty"`
Duration_in_hours float64 `json:"duration_in_hours,omitempty"`
Pods_count int `json:"pods_count,omitempty"`
Confidence_level float64 `json:"confidence_level,omitempty"`
Current ConfigObject `json:"current,omitempty"`
Config ConfigObject `json:"config,omitempty"`
Variation ConfigObject `json:"variation,omitempty"`
Notifications map[string]notification `json:"notifications,omitempty"`
type RecommendationTerm struct {
DurationInHours float64 `json:"duration_in_hours,omitempty"`
Notifications map[string]Notification `json:"notifications,omitempty"`
MonitoringStartTime time.Time `json:"monitoring_start_time,omitempty"`
RecommendationEngines *struct {
Cost RecommendationEngineObject `json:"cost,omitempty"`
Performance RecommendationEngineObject `json:"performance,omitempty"`
} `json:"recommendation_engines,omitempty"`
}

type Term struct {
Short_term RecommendationTerm `json:"short_term"`
Medium_term RecommendationTerm `json:"medium_term"`
Long_term RecommendationTerm `json:"long_term,omitempty"`
}

type ConfigObject struct {
Expand Down
4 changes: 3 additions & 1 deletion internal/types/kruizePayload/createExperiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
type createExperiment struct {
Version string `json:"version"`
Experiment_name string `json:"experiment_name"`
Cluster_name string `json:"cluster_name"`
Performance_profile string `json:"performance_profile"`
Mode string `json:"mode"`
Target_cluster string `json:"target_cluster"`
Expand All @@ -23,7 +24,7 @@ type recommendation_settings struct {
Threshold string `json:"threshold"`
}

func GetCreateExperimentPayload(experiment_name string, containers []map[string]string, data map[string]string) ([]byte, error) {
func GetCreateExperimentPayload(experiment_name string, cluster_identifier string, containers []map[string]string, data map[string]string) ([]byte, error) {
container_array := []container{}
for _, c := range containers {
container_array = append(container_array, container{
Expand All @@ -35,6 +36,7 @@ func GetCreateExperimentPayload(experiment_name string, containers []map[string]
{
Version: "1.0",
Experiment_name: experiment_name,
Cluster_name: cluster_identifier,
Performance_profile: "resource-optimization-openshift",
Mode: "monitor",
Target_cluster: "remote",
Expand Down
Loading
Loading