diff --git a/internal/config/config.go b/internal/config/config.go index c623b00c..ee4f4cac 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -31,8 +31,9 @@ type Config struct { KafkaCA string // Kruize config - KruizeUrl string `mapstructure:"KRUIZE_URL"` - KruizeWaitTime string `mapstructure:"KRUIZE_WAIT_TIME"` + KruizeUrl string `mapstructure:"KRUIZE_URL"` + KruizeWaitTime string `mapstructure:"KRUIZE_WAIT_TIME"` + KruizeMaxBulkChunkSize int `mapstructure:"KRUIZE_MAX_BULK_CHUNK_SIZE"` // Database config DBName string @@ -162,6 +163,7 @@ func initConfig() { viper.SetDefault("SERVICE_NAME", "rosocp") viper.SetDefault("API_PORT", "8000") viper.SetDefault("KRUIZE_WAIT_TIME", "30") + viper.SetDefault("KRUIZE_MAX_BULK_CHUNK_SIZE", 100) viper.SetDefault("KAFKA_CONSUMER_GROUP_ID", "ros-ocp") viper.SetDefault("KAFKA_AUTO_COMMIT", true) viper.SetDefault("LOG_LEVEL", "INFO") diff --git a/internal/services/report_processor.go b/internal/services/report_processor.go index 634c700b..8fb1a649 100644 --- a/internal/services/report_processor.go +++ b/internal/services/report_processor.go @@ -10,6 +10,7 @@ import ( "github.com/go-gota/gota/dataframe" "github.com/go-playground/validator/v10" + "github.com/redhatinsights/ros-ocp-backend/internal/config" "github.com/redhatinsights/ros-ocp-backend/internal/logging" "github.com/redhatinsights/ros-ocp-backend/internal/model" "github.com/redhatinsights/ros-ocp-backend/internal/types" @@ -20,6 +21,7 @@ import ( func ProcessReport(msg *kafka.Message) { log := logging.GetLogger() + cfg := config.GetConfig() validate := validator.New() var kafkaMsg types.KafkaMsg if !json.Valid([]byte(msg.Value)) { @@ -117,44 +119,61 @@ func ProcessReport(msg *kafka.Message) { continue } - usage_data_byte, err := kruize.Update_results(experiment_name, k8s_object) - if err != nil { - log.Error(err) - continue + var k8s_object_chunks [][]map[string]interface{} + if len(k8s_object) > cfg.KruizeMaxBulkChunkSize { + k8s_object_chunks = utils.SliceK8sObjectToChunks(k8s_object) + } else { + k8s_object_chunks = append(k8s_object_chunks, k8s_object) } - for _, data := range usage_data_byte { - - interval_start_time, _ := utils.ConvertStringToTime(data.Interval_start_time) - interval_end_time, _ := utils.ConvertStringToTime(data.Interval_end_time) - - if workload_metrics, err := model.GetWorkloadMetricsForTimestamp(experiment_name, interval_end_time); err != nil { - log.Errorf("Error while checking for workload_metrics record: %s", err) - continue - } else if !reflect.ValueOf(workload_metrics).IsZero() { - log.Debugf("workload_metrics table already has data for interval_end time: %v.", interval_end_time) + for _, chunk := range k8s_object_chunks { + usage_data_byte, err := kruize.Update_results(experiment_name, chunk) + if err != nil { + log.Error(err, experiment_name) continue } - for _, container := range data.Kubernetes_objects[0].Containers { - container_usage_metrics, err := json.Marshal(container.Metrics) + for _, data := range usage_data_byte { + + interval_start_time, err := utils.ConvertISO8601StringToTime(data.Interval_start_time) if err != nil { - log.Errorf("Unable to marshal container usage data: %v", err) + log.Errorf("Error for start time: %s", err) + continue } - - workload_metric := model.WorkloadMetrics{ - WorkloadID: workload.ID, - ContainerName: container.Container_name, - IntervalStart: interval_start_time, - IntervalEnd: interval_end_time, - UsageMetrics: container_usage_metrics, + interval_end_time, err := utils.ConvertISO8601StringToTime(data.Interval_end_time) + if err != nil { + log.Errorf("Error for end time: %s", err) + continue } - if err := workload_metric.CreateWorkloadMetrics(); err != nil { - log.Errorf("unable to add record to workload_metrics table: %v. Error: %v", workload_metric, err) + + if workload_metrics, err := model.GetWorkloadMetricsForTimestamp(experiment_name, interval_end_time); err != nil { + log.Errorf("Error while checking for workload_metrics record: %s", err) + continue + } else if !reflect.ValueOf(workload_metrics).IsZero() { + log.Debugf("workload_metrics table already has data for interval_end time: %v.", interval_end_time) continue } - } + for _, container := range data.Kubernetes_objects[0].Containers { + container_usage_metrics, err := json.Marshal(container.Metrics) + if err != nil { + log.Errorf("Unable to marshal container usage data: %v", err) + } + + workload_metric := model.WorkloadMetrics{ + WorkloadID: workload.ID, + ContainerName: container.Container_name, + IntervalStart: interval_start_time, + IntervalEnd: interval_end_time, + UsageMetrics: container_usage_metrics, + } + if err := workload_metric.CreateWorkloadMetrics(); err != nil { + log.Errorf("unable to add record to workload_metrics table: %v. Error: %v", workload_metric, err) + continue + } + } + + } } // Below we make sure that report which is been processed is the latest(interval_endtime) report. diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 54610713..10148958 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -126,6 +126,14 @@ func ConvertStringToTime(data string) (time.Time, error) { } +func ConvertISO8601StringToTime(data string) (time.Time, error) { + dateTime, err := time.Parse("2006-01-02T15:04:05.000Z", data) + if err != nil { + return time.Time{}, fmt.Errorf("unable to convert string to time: %s", err) + } + return dateTime, nil +} + func MaxIntervalEndTime(slice []string) (time.Time, error) { var converted_date_slice []time.Time for _, v := range slice { @@ -176,3 +184,19 @@ func Start_prometheus_server() { _ = http.ListenAndServe(fmt.Sprintf(":%s", cfg.PrometheusPort), nil) } } + +func SliceK8sObjectToChunks(k8s_objects []map[string]interface{}) [][]map[string]interface{} { + var chunks [][]map[string]interface{} + chunkSize := cfg.KruizeMaxBulkChunkSize + for i := 0; i < len(k8s_objects); i += chunkSize { + end := i + chunkSize + + if end > len(k8s_objects) { + end = len(k8s_objects) + } + + chunks = append(chunks, k8s_objects[i:end]) + } + + return chunks +}