From 4b0356248ad854deeebd6c2d9ebc479aaeb178a6 Mon Sep 17 00:00:00 2001 From: Suraj Patil <31805557+patilsuraj767@users.noreply.github.com> Date: Fri, 27 Oct 2023 18:11:37 +0530 Subject: [PATCH] Fixes #RHIROS-1402 - Fixed payload creation for kruize updateResult API (#142) --- internal/services/report_processor.go | 30 ++++++++++++++++++++++----- internal/utils/kruize/kruize_api.go | 5 ++--- internal/utils/utils.go | 16 -------------- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/internal/services/report_processor.go b/internal/services/report_processor.go index 3fab55fc..d7e389d7 100644 --- a/internal/services/report_processor.go +++ b/internal/services/report_processor.go @@ -14,14 +14,17 @@ import ( "github.com/redhatinsights/ros-ocp-backend/internal/logging" "github.com/redhatinsights/ros-ocp-backend/internal/model" "github.com/redhatinsights/ros-ocp-backend/internal/types" + "github.com/redhatinsights/ros-ocp-backend/internal/types/kruizePayload" w "github.com/redhatinsights/ros-ocp-backend/internal/types/workload" "github.com/redhatinsights/ros-ocp-backend/internal/utils" "github.com/redhatinsights/ros-ocp-backend/internal/utils/kruize" ) +var cfg *config.Config = config.GetConfig() + func ProcessReport(msg *kafka.Message) { log := logging.GetLogger() - cfg := config.GetConfig() + cfg = config.GetConfig() validate := validator.New() var kafkaMsg types.KafkaMsg if !json.Valid([]byte(msg.Value)) { @@ -120,11 +123,12 @@ func ProcessReport(msg *kafka.Message) { continue } - var k8s_object_chunks [][]map[string]interface{} - if len(k8s_object) > cfg.KruizeMaxBulkChunkSize { - k8s_object_chunks = utils.SliceK8sObjectToChunks(k8s_object) + var k8s_object_chunks [][]kruizePayload.UpdateResult + update_result_payload_data := kruizePayload.GetUpdateResultPayload(experiment_name, k8s_object) + if len(update_result_payload_data) > cfg.KruizeMaxBulkChunkSize { + k8s_object_chunks = sliceUpdatePayloadToChunks(update_result_payload_data) } else { - k8s_object_chunks = append(k8s_object_chunks, k8s_object) + k8s_object_chunks = append(k8s_object_chunks, update_result_payload_data) } for _, chunk := range k8s_object_chunks { @@ -238,3 +242,19 @@ func ProcessReport(msg *kafka.Message) { } } } + +func sliceUpdatePayloadToChunks(k8s_objects []kruizePayload.UpdateResult) [][]kruizePayload.UpdateResult { + var chunks [][]kruizePayload.UpdateResult + chunkSize := cfg.KruizeMaxBulkChunkSize + for i := 0; i < len(k8s_objects); i += chunkSize { + end := i + chunkSize + + if end > len(k8s_objects) { + end = len(k8s_objects) + } + + chunks = append(chunks, k8s_objects[i:end]) + } + + return chunks +} diff --git a/internal/utils/kruize/kruize_api.go b/internal/utils/kruize/kruize_api.go index 14e82747..dc69971d 100644 --- a/internal/utils/kruize/kruize_api.go +++ b/internal/utils/kruize/kruize_api.go @@ -93,8 +93,7 @@ func Create_kruize_experiments(experiment_name string, k8s_object []map[string]i return container_names, nil } -func Update_results(experiment_name string, k8s_object []map[string]interface{}) ([]kruizePayload.UpdateResult, error) { - payload_data := kruizePayload.GetUpdateResultPayload(experiment_name, k8s_object) +func Update_results(experiment_name string, payload_data []kruizePayload.UpdateResult) ([]kruizePayload.UpdateResult, error) { postBody, err := json.Marshal(payload_data) if err != nil { return nil, fmt.Errorf("unable to create payload: %v", err) @@ -122,7 +121,7 @@ func Update_results(experiment_name string, k8s_object []map[string]interface{}) log.Error("Performance profile does not exist") log.Info("Tring to create resource_optimization_openshift performance profile") utils.Setup_kruize_performance_profile() - if payload_data, err := Update_results(experiment_name, k8s_object); err != nil { + if payload_data, err := Update_results(experiment_name, payload_data); err != nil { return nil, err } else { return payload_data, nil diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 10148958..2ce954d2 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -184,19 +184,3 @@ func Start_prometheus_server() { _ = http.ListenAndServe(fmt.Sprintf(":%s", cfg.PrometheusPort), nil) } } - -func SliceK8sObjectToChunks(k8s_objects []map[string]interface{}) [][]map[string]interface{} { - var chunks [][]map[string]interface{} - chunkSize := cfg.KruizeMaxBulkChunkSize - for i := 0; i < len(k8s_objects); i += chunkSize { - end := i + chunkSize - - if end > len(k8s_objects) { - end = len(k8s_objects) - } - - chunks = append(chunks, k8s_objects[i:end]) - } - - return chunks -}