Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RHIROS-1327 - Adding support for report having more than 24hrs o… #122

Merged
merged 5 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type Config struct {
KafkaCA string

// Kruize config
KruizeUrl string `mapstructure:"KRUIZE_URL"`
KruizeWaitTime string `mapstructure:"KRUIZE_WAIT_TIME"`
KruizeUrl string `mapstructure:"KRUIZE_URL"`
KruizeWaitTime string `mapstructure:"KRUIZE_WAIT_TIME"`
KruizeMaxBulkChunkSize int `mapstructure:"KRUIZE_MAX_BULK_CHUNK_SIZE"`

// Database config
DBName string
Expand Down Expand Up @@ -162,6 +163,7 @@ func initConfig() {
viper.SetDefault("SERVICE_NAME", "rosocp")
viper.SetDefault("API_PORT", "8000")
viper.SetDefault("KRUIZE_WAIT_TIME", "30")
viper.SetDefault("KRUIZE_MAX_BULK_CHUNK_SIZE", 100)
viper.SetDefault("KAFKA_CONSUMER_GROUP_ID", "ros-ocp")
viper.SetDefault("KAFKA_AUTO_COMMIT", true)
viper.SetDefault("LOG_LEVEL", "INFO")
Expand Down
73 changes: 46 additions & 27 deletions internal/services/report_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-gota/gota/dataframe"
"github.com/go-playground/validator/v10"

"github.com/redhatinsights/ros-ocp-backend/internal/config"
"github.com/redhatinsights/ros-ocp-backend/internal/logging"
"github.com/redhatinsights/ros-ocp-backend/internal/model"
"github.com/redhatinsights/ros-ocp-backend/internal/types"
Expand All @@ -20,6 +21,7 @@ import (

func ProcessReport(msg *kafka.Message) {
log := logging.GetLogger()
cfg := config.GetConfig()
validate := validator.New()
var kafkaMsg types.KafkaMsg
if !json.Valid([]byte(msg.Value)) {
Expand Down Expand Up @@ -117,44 +119,61 @@ func ProcessReport(msg *kafka.Message) {
continue
}

usage_data_byte, err := kruize.Update_results(experiment_name, k8s_object)
if err != nil {
log.Error(err)
continue
var k8s_object_chunks [][]map[string]interface{}
if len(k8s_object) > cfg.KruizeMaxBulkChunkSize {
k8s_object_chunks = utils.SliceK8sObjectToChunks(k8s_object)
} else {
k8s_object_chunks = append(k8s_object_chunks, k8s_object)
}

for _, data := range usage_data_byte {

interval_start_time, _ := utils.ConvertStringToTime(data.Interval_start_time)
kgaikwad marked this conversation as resolved.
Show resolved Hide resolved
interval_end_time, _ := utils.ConvertStringToTime(data.Interval_end_time)

if workload_metrics, err := model.GetWorkloadMetricsForTimestamp(experiment_name, interval_end_time); err != nil {
log.Errorf("Error while checking for workload_metrics record: %s", err)
continue
} else if !reflect.ValueOf(workload_metrics).IsZero() {
log.Debugf("workload_metrics table already has data for interval_end time: %v.", interval_end_time)
for _, chunk := range k8s_object_chunks {
usage_data_byte, err := kruize.Update_results(experiment_name, chunk)
if err != nil {
log.Error(err, experiment_name)
continue
}

for _, container := range data.Kubernetes_objects[0].Containers {
container_usage_metrics, err := json.Marshal(container.Metrics)
for _, data := range usage_data_byte {

interval_start_time, err := utils.ConvertISO8601StringToTime(data.Interval_start_time)
if err != nil {
log.Errorf("Unable to marshal container usage data: %v", err)
log.Errorf("Error for start time: %s", err)
continue
}

workload_metric := model.WorkloadMetrics{
WorkloadID: workload.ID,
ContainerName: container.Container_name,
IntervalStart: interval_start_time,
IntervalEnd: interval_end_time,
UsageMetrics: container_usage_metrics,
interval_end_time, err := utils.ConvertISO8601StringToTime(data.Interval_end_time)
if err != nil {
log.Errorf("Error for end time: %s", err)
continue
}
if err := workload_metric.CreateWorkloadMetrics(); err != nil {
log.Errorf("unable to add record to workload_metrics table: %v. Error: %v", workload_metric, err)

if workload_metrics, err := model.GetWorkloadMetricsForTimestamp(experiment_name, interval_end_time); err != nil {
log.Errorf("Error while checking for workload_metrics record: %s", err)
continue
} else if !reflect.ValueOf(workload_metrics).IsZero() {
log.Debugf("workload_metrics table already has data for interval_end time: %v.", interval_end_time)
continue
}
}

for _, container := range data.Kubernetes_objects[0].Containers {
container_usage_metrics, err := json.Marshal(container.Metrics)
if err != nil {
log.Errorf("Unable to marshal container usage data: %v", err)
}

workload_metric := model.WorkloadMetrics{
WorkloadID: workload.ID,
ContainerName: container.Container_name,
IntervalStart: interval_start_time,
IntervalEnd: interval_end_time,
UsageMetrics: container_usage_metrics,
}
if err := workload_metric.CreateWorkloadMetrics(); err != nil {
log.Errorf("unable to add record to workload_metrics table: %v. Error: %v", workload_metric, err)
continue
}
}

}
}

// Below we make sure that report which is been processed is the latest(interval_endtime) report.
Expand Down
24 changes: 24 additions & 0 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ func ConvertStringToTime(data string) (time.Time, error) {

}

func ConvertISO8601StringToTime(data string) (time.Time, error) {
dateTime, err := time.Parse("2006-01-02T15:04:05.000Z", data)
if err != nil {
return time.Time{}, fmt.Errorf("unable to convert string to time: %s", err)
}
return dateTime, nil
kgaikwad marked this conversation as resolved.
Show resolved Hide resolved
}

func MaxIntervalEndTime(slice []string) (time.Time, error) {
var converted_date_slice []time.Time
for _, v := range slice {
Expand Down Expand Up @@ -176,3 +184,19 @@ func Start_prometheus_server() {
_ = http.ListenAndServe(fmt.Sprintf(":%s", cfg.PrometheusPort), nil)
}
}

func SliceK8sObjectToChunks(k8s_objects []map[string]interface{}) [][]map[string]interface{} {
var chunks [][]map[string]interface{}
chunkSize := cfg.KruizeMaxBulkChunkSize
for i := 0; i < len(k8s_objects); i += chunkSize {
patilsuraj767 marked this conversation as resolved.
Show resolved Hide resolved
end := i + chunkSize

if end > len(k8s_objects) {
end = len(k8s_objects)
}

chunks = append(chunks, k8s_objects[i:end])
}

return chunks
}
Loading