Skip to content

Commit

Permalink
Ref - #RHIROS-1307 Adding support for report having more than 24hrs o…
Browse files Browse the repository at this point in the history
…f data.
  • Loading branch information
patilsuraj767 committed Sep 7, 2023
1 parent d89aaa5 commit 510d846
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 32 deletions.
6 changes: 4 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type Config struct {
KafkaCA string

// Kruize config
KruizeUrl string `mapstructure:"KRUIZE_URL"`
KruizeWaitTime string `mapstructure:"KRUIZE_WAIT_TIME"`
KruizeUrl string `mapstructure:"KRUIZE_URL"`
KruizeWaitTime string `mapstructure:"KRUIZE_WAIT_TIME"`
KruizeMaxBulkUpdateLimit int `mapstructure:"KRUIZE_MAX_BULK_UPDATE_LIMIT"`

// Database config
DBName string
Expand Down Expand Up @@ -162,6 +163,7 @@ func initConfig() {
viper.SetDefault("SERVICE_NAME", "rosocp")
viper.SetDefault("API_PORT", "8000")
viper.SetDefault("KRUIZE_WAIT_TIME", "30")
viper.SetDefault("KRUIZE_MAX_BULK_UPDATE_LIMIT", 100)
viper.SetDefault("KAFKA_CONSUMER_GROUP_ID", "ros-ocp")
viper.SetDefault("KAFKA_AUTO_COMMIT", true)
viper.SetDefault("LOG_LEVEL", "INFO")
Expand Down
66 changes: 36 additions & 30 deletions internal/services/report_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-gota/gota/dataframe"
"github.com/go-playground/validator/v10"

"github.com/redhatinsights/ros-ocp-backend/internal/config"
"github.com/redhatinsights/ros-ocp-backend/internal/logging"
"github.com/redhatinsights/ros-ocp-backend/internal/model"
"github.com/redhatinsights/ros-ocp-backend/internal/types"
Expand All @@ -20,6 +21,7 @@ import (

func ProcessReport(msg *kafka.Message) {
log := logging.GetLogger()
cfg := config.GetConfig()
validate := validator.New()
var kafkaMsg types.KafkaMsg
if !json.Valid([]byte(msg.Value)) {
Expand Down Expand Up @@ -117,44 +119,48 @@ func ProcessReport(msg *kafka.Message) {
continue
}

usage_data_byte, err := kruize.Update_results(experiment_name, k8s_object)
if err != nil {
log.Error(err)
continue
}

for _, data := range usage_data_byte {
k8s_object_chunks := utils.ChunkK8sobjectSlice(k8s_object, cfg.KruizeMaxBulkUpdateLimit)

interval_start_time, _ := utils.ConvertStringToTime(data.Interval_start_time)
interval_end_time, _ := utils.ConvertStringToTime(data.Interval_end_time)

if workload_metrics, err := model.GetWorkloadMetricsForTimestamp(experiment_name, interval_end_time); err != nil {
log.Errorf("Error while checking for workload_metrics record: %s", err)
continue
} else if !reflect.ValueOf(workload_metrics).IsZero() {
log.Debugf("workload_metrics table already has data for interval_end time: %v.", interval_end_time)
for _, chunk := range k8s_object_chunks {
usage_data_byte, err := kruize.Update_results(experiment_name, chunk)
if err != nil {
log.Error(err)
continue
}

for _, container := range data.Kubernetes_objects[0].Containers {
container_usage_metrics, err := json.Marshal(container.Metrics)
if err != nil {
log.Errorf("Unable to marshal container usage data: %v", err)
}
for _, data := range usage_data_byte {

workload_metric := model.WorkloadMetrics{
WorkloadID: workload.ID,
ContainerName: container.Container_name,
IntervalStart: interval_start_time,
IntervalEnd: interval_end_time,
UsageMetrics: container_usage_metrics,
}
if err := workload_metric.CreateWorkloadMetrics(); err != nil {
log.Errorf("unable to add record to workload_metrics table: %v. Error: %v", workload_metric, err)
interval_start_time, _ := utils.ConvertISO8601StringToTime(data.Interval_start_time)
interval_end_time, _ := utils.ConvertISO8601StringToTime(data.Interval_end_time)

if workload_metrics, err := model.GetWorkloadMetricsForTimestamp(experiment_name, interval_end_time); err != nil {
log.Errorf("Error while checking for workload_metrics record: %s", err)
continue
} else if !reflect.ValueOf(workload_metrics).IsZero() {
log.Debugf("workload_metrics table already has data for interval_end time: %v.", interval_end_time)
continue
}
}

for _, container := range data.Kubernetes_objects[0].Containers {
container_usage_metrics, err := json.Marshal(container.Metrics)
if err != nil {
log.Errorf("Unable to marshal container usage data: %v", err)
}

workload_metric := model.WorkloadMetrics{
WorkloadID: workload.ID,
ContainerName: container.Container_name,
IntervalStart: interval_start_time,
IntervalEnd: interval_end_time,
UsageMetrics: container_usage_metrics,
}
if err := workload_metric.CreateWorkloadMetrics(); err != nil {
log.Errorf("unable to add record to workload_metrics table: %v. Error: %v", workload_metric, err)
continue
}
}

}
}

// Below we make sure that report which is been processed is the latest(interval_endtime) report.
Expand Down
23 changes: 23 additions & 0 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ func ConvertStringToTime(data string) (time.Time, error) {

}

func ConvertISO8601StringToTime(data string) (time.Time, error) {
dateTime, err := time.Parse("2006-01-02T15:04:05.000Z", data)
if err != nil {
return time.Time{}, fmt.Errorf("unable to convert string to time: %s", err)
}
return dateTime, nil
}

func MaxIntervalEndTime(slice []string) (time.Time, error) {
var converted_date_slice []time.Time
for _, v := range slice {
Expand Down Expand Up @@ -176,3 +184,18 @@ func Start_prometheus_server() {
_ = http.ListenAndServe(fmt.Sprintf(":%s", cfg.PrometheusPort), nil)
}
}

func ChunkK8sobjectSlice(k8s_objects []map[string]interface{}, chunkSize int) [][]map[string]interface{} {
var chunks [][]map[string]interface{}
for i := 0; i < len(k8s_objects); i += chunkSize {
end := i + chunkSize

if end > len(k8s_objects) {
end = len(k8s_objects)
}

chunks = append(chunks, k8s_objects[i:end])
}

return chunks
}

0 comments on commit 510d846

Please sign in to comment.