Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RHIROS-1327 - Adding support for report having more than 24hrs o… #122

Merged
merged 5 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type Config struct {
KafkaCA string

// Kruize config
KruizeUrl string `mapstructure:"KRUIZE_URL"`
KruizeWaitTime string `mapstructure:"KRUIZE_WAIT_TIME"`
KruizeUrl string `mapstructure:"KRUIZE_URL"`
KruizeWaitTime string `mapstructure:"KRUIZE_WAIT_TIME"`
KruizeMaxBulkChunkSize int `mapstructure:"KRUIZE_MAX_BULK_CHUNK_SIZE"`

// Database config
DBName string
Expand Down Expand Up @@ -162,6 +163,7 @@ func initConfig() {
viper.SetDefault("SERVICE_NAME", "rosocp")
viper.SetDefault("API_PORT", "8000")
viper.SetDefault("KRUIZE_WAIT_TIME", "30")
viper.SetDefault("KRUIZE_MAX_BULK_CHUNK_SIZE", 100)
viper.SetDefault("KAFKA_CONSUMER_GROUP_ID", "ros-ocp")
viper.SetDefault("KAFKA_AUTO_COMMIT", true)
viper.SetDefault("LOG_LEVEL", "INFO")
Expand Down
64 changes: 34 additions & 30 deletions internal/services/report_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,44 +117,48 @@ func ProcessReport(msg *kafka.Message) {
continue
}

usage_data_byte, err := kruize.Update_results(experiment_name, k8s_object)
if err != nil {
log.Error(err)
continue
}

for _, data := range usage_data_byte {
k8s_object_chunks := utils.SliceK8sObjectToChunks(k8s_object)
patilsuraj767 marked this conversation as resolved.
Show resolved Hide resolved

interval_start_time, _ := utils.ConvertStringToTime(data.Interval_start_time)
kgaikwad marked this conversation as resolved.
Show resolved Hide resolved
interval_end_time, _ := utils.ConvertStringToTime(data.Interval_end_time)

if workload_metrics, err := model.GetWorkloadMetricsForTimestamp(experiment_name, interval_end_time); err != nil {
log.Errorf("Error while checking for workload_metrics record: %s", err)
continue
} else if !reflect.ValueOf(workload_metrics).IsZero() {
log.Debugf("workload_metrics table already has data for interval_end time: %v.", interval_end_time)
for _, chunk := range k8s_object_chunks {
usage_data_byte, err := kruize.Update_results(experiment_name, chunk)
if err != nil {
log.Error(err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you confirm what details present under err? Will it be sufficient for debugging?

If something went wrong/failure occurs for update_results api with certain chunk, there will be data loss. Am I correct?
If this is so, is it expected behavior?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a chunk identifier be useful here 🤔

## pseudocode
usage_data_byte, err := kruize.Update_results(experiment_name, chunk)
if err != nil {
       log.Error(err, experiment_name, chunk_serial)
}
...
...
chunk_serial += 1

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, along with unique identifier for resource, it is good to include chunk specific info. example - time duration

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @kgaikwad if something went wrong/failure occurs for update_results api with certain chunk, there will be data loss but we cannot help it. example - if suppose kruize application is down then update_results api will fail and we cannot do anything here to stop further API requests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes are not dividing chunks based on time stamp, means it is not the case that every metrics in the chunk have the same time stamp. So I think timestamp cannot be the unique identifier.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @kgaikwad if something went wrong/failure occurs for update_results api with certain chunk, there will be data loss but we cannot help it. example - if suppose kruize application is down then update_results api will fail and we cannot do anything here to stop further API requests.

In my opinion, we should think on such scenarios and come with handling to minimize data loss. May be not for this PR but it is better to do brainstorming around.

So I think timestamp cannot be the unique identifier.

Then could you please include experiment name and any field info that might be specific to that request? So it will be easy for identifying that this upload request not processed completely, data loss observed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the experiment_name in the log and by default request_id will be getting logged with the every log message.

Copy link
Collaborator

@kgaikwad kgaikwad Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patilsuraj767, for first part, could you add a backlog ticket so that it won't get ignored by time. Thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue
}

for _, container := range data.Kubernetes_objects[0].Containers {
container_usage_metrics, err := json.Marshal(container.Metrics)
if err != nil {
log.Errorf("Unable to marshal container usage data: %v", err)
}
for _, data := range usage_data_byte {

workload_metric := model.WorkloadMetrics{
WorkloadID: workload.ID,
ContainerName: container.Container_name,
IntervalStart: interval_start_time,
IntervalEnd: interval_end_time,
UsageMetrics: container_usage_metrics,
}
if err := workload_metric.CreateWorkloadMetrics(); err != nil {
log.Errorf("unable to add record to workload_metrics table: %v. Error: %v", workload_metric, err)
interval_start_time, _ := utils.ConvertISO8601StringToTime(data.Interval_start_time)
interval_end_time, _ := utils.ConvertISO8601StringToTime(data.Interval_end_time)

if workload_metrics, err := model.GetWorkloadMetricsForTimestamp(experiment_name, interval_end_time); err != nil {
log.Errorf("Error while checking for workload_metrics record: %s", err)
continue
} else if !reflect.ValueOf(workload_metrics).IsZero() {
log.Debugf("workload_metrics table already has data for interval_end time: %v.", interval_end_time)
continue
}
}

for _, container := range data.Kubernetes_objects[0].Containers {
container_usage_metrics, err := json.Marshal(container.Metrics)
if err != nil {
log.Errorf("Unable to marshal container usage data: %v", err)
}

workload_metric := model.WorkloadMetrics{
WorkloadID: workload.ID,
ContainerName: container.Container_name,
IntervalStart: interval_start_time,
IntervalEnd: interval_end_time,
UsageMetrics: container_usage_metrics,
}
if err := workload_metric.CreateWorkloadMetrics(); err != nil {
log.Errorf("unable to add record to workload_metrics table: %v. Error: %v", workload_metric, err)
continue
}
}

}
}

// Below we make sure that report which is been processed is the latest(interval_endtime) report.
Expand Down
24 changes: 24 additions & 0 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ func ConvertStringToTime(data string) (time.Time, error) {

}

func ConvertISO8601StringToTime(data string) (time.Time, error) {
dateTime, err := time.Parse("2006-01-02T15:04:05.000Z", data)
if err != nil {
return time.Time{}, fmt.Errorf("unable to convert string to time: %s", err)
}
return dateTime, nil
kgaikwad marked this conversation as resolved.
Show resolved Hide resolved
}

func MaxIntervalEndTime(slice []string) (time.Time, error) {
var converted_date_slice []time.Time
for _, v := range slice {
Expand Down Expand Up @@ -176,3 +184,19 @@ func Start_prometheus_server() {
_ = http.ListenAndServe(fmt.Sprintf(":%s", cfg.PrometheusPort), nil)
}
}

func SliceK8sObjectToChunks(k8s_objects []map[string]interface{}) [][]map[string]interface{} {
var chunks [][]map[string]interface{}
chunkSize := cfg.KruizeMaxBulkChunkSize
for i := 0; i < len(k8s_objects); i += chunkSize {
patilsuraj767 marked this conversation as resolved.
Show resolved Hide resolved
end := i + chunkSize

if end > len(k8s_objects) {
end = len(k8s_objects)
}

chunks = append(chunks, k8s_objects[i:end])
}

return chunks
}
Loading