Skip to content

Commit

Permalink
RHIROS-1326 Kruize 0.0.20.1_rm Integration
Browse files Browse the repository at this point in the history
  • Loading branch information
saltgen committed Nov 21, 2023
1 parent dbd5d26 commit 752adf2
Show file tree
Hide file tree
Showing 9 changed files with 955 additions and 281 deletions.
116 changes: 82 additions & 34 deletions internal/api/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,22 +223,17 @@ func TransformComponentUnits(jsonData datatypes.JSON) map[string]interface{} {
return nil
}

durationBased, ok := data["duration_based"].(map[string]interface{})
if !ok {
fmt.Printf("duration_based not found in JSON")
}

convertMemory := func(memory map[string]interface{}) error {
amount, ok := memory["amount"].(float64)
if ok {
memoryInMiB := amount / 1024 / 1024
if math.Abs(memoryInMiB) >= 1024 {
memoryInGiB := memoryInMiB / 1024
memory["amount"] = math.Trunc(memoryInGiB*100) / 100
memory["format"] = "GiB"
memory["format"] = "Gi"
} else {
memory["amount"] = math.Trunc(memoryInMiB*100) / 100
memory["format"] = "MiB"
memory["format"] = "Mi"
}
}
return nil
Expand All @@ -265,56 +260,109 @@ func TransformComponentUnits(jsonData datatypes.JSON) map[string]interface{} {
if math.Abs(cpuInCores) < 1 {
cpuInMillicores := cpuInCores * 1000
cpu["amount"] = math.Round(cpuInMillicores) // millicore values are rounded & don't require decimal precision
cpu["format"] = "millicores"
cpu["format"] = "m"
} else {
cpu["amount"] = truncateToThreeDecimalPlaces(cpuInCores)
cpu["format"] = "cores"
cpu["format"] = nil
}
}
return nil
}

// Current
current_config, ok := data["current"].(map[string]interface{})
if !ok {
log.Error("current not found in JSON")
}

for _, section := range []string{"limits", "requests"} {

sectionObject, ok := current_config[section].(map[string]interface{})
if ok {
memory, ok := sectionObject["memory"].(map[string]interface{})
if ok {
err := convertMemory(memory)
if err != nil {
fmt.Printf("error converting memory in %s: %v\n", sectionObject, err)
continue
}
}
cpu, ok := sectionObject["cpu"].(map[string]interface{})
if ok {
err := convertCPU(cpu)
if err != nil {
fmt.Printf("error converting cpu in %s: %v\n", sectionObject, err)
continue
}
}
}
}

/*
Recommendation data is available for three periods
under cost and performance keys
For each of these actual values will be present in
below mentioned dataBlocks > request and limits
*/

for _, period := range []string{"long_term", "medium_term", "short_term"} {
intervalData, ok := durationBased[period].(map[string]interface{})
// Recommendations
recommendation_terms, ok := data["recommendation_terms"].(map[string]interface{})
if !ok {
log.Error("recommendation_terms not found in JSON")
}

for _, period := range []string{"short_term", "medium_term", "long_term"} {
intervalData, ok := recommendation_terms[period].(map[string]interface{})
if !ok {
continue
}

for _, dataBlock := range []string{"current", "config", "variation"} {
recommendationSection, ok := intervalData[dataBlock].(map[string]interface{})
if !ok {
continue
}
// Hack
// remove nil equivalent monitoring_start_time in API response
monitoring_start_time := intervalData["monitoring_start_time"]
if monitoring_start_time == "0001-01-01T00:00:00Z" {
delete(intervalData, "monitoring_start_time")
}

if intervalData["recommendation_engines"] != nil {

for _, section := range []string{"limits", "requests"} {
for _, recommendationType := range []string{"cost", "performance"} {
engineData, ok := intervalData["recommendation_engines"].(map[string]interface{})[recommendationType].(map[string]interface{})
if !ok {
continue
}

sectionObject, ok := recommendationSection[section].(map[string]interface{})
if ok {
memory, ok := sectionObject["memory"].(map[string]interface{})
if ok {
err := convertMemory(memory)
if err != nil {
fmt.Printf("error converting memory in %s: %v\n", period, err)
continue
}
for _, dataBlock := range []string{"config", "variation"} {
recommendationSection, ok := engineData[dataBlock].(map[string]interface{})
if !ok {
continue
}
cpu, ok := sectionObject["cpu"].(map[string]interface{})
if ok {
err := convertCPU(cpu)
if err != nil {
fmt.Printf("error converting cpu in %s: %v\n", period, err)
continue

for _, section := range []string{"limits", "requests"} {

sectionObject, ok := recommendationSection[section].(map[string]interface{})
if ok {
memory, ok := sectionObject["memory"].(map[string]interface{})
if ok {
err := convertMemory(memory)
if err != nil {
fmt.Printf("error converting memory in %s: %v\n", period, err)
continue
}
}
cpu, ok := sectionObject["cpu"].(map[string]interface{})
if ok {
err := convertCPU(cpu)
if err != nil {
fmt.Printf("error converting cpu in %s: %v\n", period, err)
continue
}
}
}
}

}
}

}
}
}

Expand Down
13 changes: 7 additions & 6 deletions internal/services/report_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ func ProcessReport(msg *kafka.Message) {
k8s_object_name,
)

container_names, err := kruize.Create_kruize_experiments(experiment_name, k8s_object)
cluster_identifier := kafkaMsg.Metadata.Org_id + ";" + kafkaMsg.Metadata.Cluster_uuid
container_names, err := kruize.Create_kruize_experiments(experiment_name, cluster_identifier, k8s_object)
if err != nil {
log.Error(err)
continue
Expand Down Expand Up @@ -197,7 +198,7 @@ func ProcessReport(msg *kafka.Message) {
continue
}

if kruize.Is_valid_recommendation(recommendation) {
if kruize.Is_valid_recommendation(recommendation, experiment_name, maxEndTime) {
containers := recommendation[0].Kubernetes_objects[0].Containers
for _, container := range containers {
for _, v := range container.Recommendations.Data {
Expand All @@ -210,8 +211,8 @@ func ProcessReport(msg *kafka.Message) {
recommendationSet := model.RecommendationSet{
WorkloadID: workload.ID,
ContainerName: container.Container_name,
MonitoringStartTime: v.Duration_based.Short_term.Monitoring_start_time,
MonitoringEndTime: v.Duration_based.Short_term.Monitoring_end_time,
MonitoringStartTime: v.RecommendationTerms.Short_term.MonitoringStartTime,
MonitoringEndTime: v.MonitoringEndTime,
Recommendations: marshalData,
}
if err := recommendationSet.CreateRecommendationSet(); err != nil {
Expand All @@ -226,8 +227,8 @@ func ProcessReport(msg *kafka.Message) {
OrgId: rh_account.OrgId,
WorkloadID: workload.ID,
ContainerName: container.Container_name,
MonitoringStartTime: v.Duration_based.Short_term.Monitoring_start_time,
MonitoringEndTime: v.Duration_based.Short_term.Monitoring_end_time,
MonitoringStartTime: v.RecommendationTerms.Short_term.MonitoringStartTime,
MonitoringEndTime: v.MonitoringEndTime,
Recommendations: marshalData,
}
if err := historicalRecommendationSet.CreateHistoricalRecommendationSet(); err != nil {
Expand Down
49 changes: 30 additions & 19 deletions internal/types/kruizePayload/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,47 @@ type aggregation_info struct {
}

type recommendation struct {
Data map[string]recommendationType `json:"data,omitempty"`
Notifications map[string]notification `json:"notifications,omitempty"`
Version string `json:"version,omitempty"`
Data map[string]RecommendationData `json:"data,omitempty"`
Notifications map[string]Notification `json:"notifications,omitempty"`
}

type notification struct {

type Notification struct {
NotifyType string `json:"type,omitempty"`
Message string `json:"message,omitempty"`
Code int `json:"code,omitempty"`
}

type recommendationType struct {
Duration_based termbased `json:"duration_based,omitempty"`
type RecommendationEngineObject struct {
PodsCount int `json:"pods_count,omitempty"`
ConfidenceLevel float64 `json:"confidence_level,omitempty"`
Config ConfigObject `json:"config,omitempty"`
Variation ConfigObject `json:"variation,omitempty"`
Notifications map[string]Notification `json:"notifications,omitempty"`
}

type RecommendationData struct {
Notifications map[string]Notification `json:"notifications,omitempty"`
MonitoringEndTime time.Time `json:"monitoring_end_time,omitempty"`
Current ConfigObject `json:"current,omitempty"`
RecommendationTerms TermBased `json:"recommendation_terms,omitempty"`
}

type termbased struct {
Short_term recommendationObject `json:"short_term,omitempty"`
Medium_term recommendationObject `json:"medium_term,omitempty"`
Long_term recommendationObject `json:"long_term,omitempty"`
type RecommendationTerm struct {
DurationInHours float64 `json:"duration_in_hours,omitempty"`
Notifications map[string]Notification `json:"notifications,omitempty"`
MonitoringStartTime time.Time `json:"monitoring_start_time,omitempty"`
RecommendationEngines *struct {
Cost RecommendationEngineObject `json:"cost,omitempty"`
Performance RecommendationEngineObject `json:"performance,omitempty"`
} `json:"recommendation_engines,omitempty"`
}

type recommendationObject struct {
Monitoring_start_time time.Time `json:"monitoring_start_time,omitempty"`
Monitoring_end_time time.Time `json:"monitoring_end_time,omitempty"`
Duration_in_hours float64 `json:"duration_in_hours,omitempty"`
Pods_count int `json:"pods_count,omitempty"`
Confidence_level float64 `json:"confidence_level,omitempty"`
Current ConfigObject `json:"current,omitempty"`
Config ConfigObject `json:"config,omitempty"`
Variation ConfigObject `json:"variation,omitempty"`
Notifications map[string]notification `json:"notifications,omitempty"`
type TermBased struct {
Short_term RecommendationTerm `json:"short_term"`
Medium_term RecommendationTerm `json:"medium_term"`
Long_term RecommendationTerm `json:"long_term,omitempty"`
}

type ConfigObject struct {
Expand Down
4 changes: 3 additions & 1 deletion internal/types/kruizePayload/createExperiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
type createExperiment struct {
Version string `json:"version"`
Experiment_name string `json:"experiment_name"`
Cluster_name string `json:"cluster_name"`
Performance_profile string `json:"performance_profile"`
Mode string `json:"mode"`
Target_cluster string `json:"target_cluster"`
Expand All @@ -23,7 +24,7 @@ type recommendation_settings struct {
Threshold string `json:"threshold"`
}

func GetCreateExperimentPayload(experiment_name string, containers []map[string]string, data map[string]string) ([]byte, error) {
func GetCreateExperimentPayload(experiment_name string, cluster_identifier string, containers []map[string]string, data map[string]string) ([]byte, error) {
container_array := []container{}
for _, c := range containers {
container_array = append(container_array, container{
Expand All @@ -35,6 +36,7 @@ func GetCreateExperimentPayload(experiment_name string, containers []map[string]
{
Version: "1.0",
Experiment_name: experiment_name,
Cluster_name: cluster_identifier,
Performance_profile: "resource-optimization-openshift",
Mode: "monitor",
Target_cluster: "remote",
Expand Down
Loading

0 comments on commit 752adf2

Please sign in to comment.