Skip to content

Commit

Permalink
Fixes #RHIROS-1347 - Dynamically create table partitions to optimize …
Browse files Browse the repository at this point in the history
…DB performance
  • Loading branch information
patilsuraj767 committed Sep 17, 2023
1 parent cb83d1e commit d6f0632
Show file tree
Hide file tree
Showing 18 changed files with 136 additions and 39 deletions.
5 changes: 3 additions & 2 deletions internal/model/historical_recommendation_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
)

type HistoricalRecommendationSet struct {
ID string `gorm:"primaryKey;not null;autoIncrement"`
ID uint `gorm:"primaryKey;not null;autoIncrement"`
OrgId string `gorm:"type:text;not null"`
WorkloadID uint
Workload Workload `gorm:"foreignKey:WorkloadID"`
ContainerName string
Expand All @@ -22,7 +23,7 @@ type HistoricalRecommendationSet struct {
func (r *HistoricalRecommendationSet) CreateHistoricalRecommendationSet() error {
db := database.GetDB()
result := db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "workload_id"}, {Name: "container_name"}, {Name: "monitoring_end_time"}},
Columns: []clause.Column{{Name: "org_id"}, {Name: "workload_id"}, {Name: "container_name"}, {Name: "monitoring_end_time"}},
DoUpdates: clause.AssignmentColumns([]string{"monitoring_start_time", "monitoring_end_time", "recommendations", "updated_at"}),
}).Create(r)

Expand Down
5 changes: 3 additions & 2 deletions internal/model/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

type Workload struct {
ID uint `gorm:"primaryKey;not null;autoIncrement"`
ID uint `gorm:"primaryKey;not null;autoIncrement"`
OrgId string `gorm:"type:text;not null"`
ClusterID uint
Cluster Cluster `gorm:"foreignKey:ClusterID" json:"-"`
ExperimentName string `gorm:"type:text"`
Expand All @@ -32,7 +33,7 @@ func (w *Workload) AfterFind(tx *gorm.DB) error {
func (w *Workload) CreateWorkload() error {
db := database.GetDB()
result := db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "cluster_id"}, {Name: "experiment_name"}},
Columns: []clause.Column{{Name: "org_id"}, {Name: "cluster_id"}, {Name: "experiment_name"}},
DoUpdates: clause.AssignmentColumns([]string{"containers", "metrics_upload_at"}),
}).Create(w)

Expand Down
5 changes: 3 additions & 2 deletions internal/model/workload_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
)

type WorkloadMetrics struct {
ID uint `gorm:"primaryKey;not null;autoIncrement"`
ID uint `gorm:"primaryKey;not null;autoIncrement"`
OrgId string `gorm:"type:text;not null"`
WorkloadID uint
Workload Workload `gorm:"foreignKey:WorkloadID"`
ContainerName string
Expand All @@ -21,7 +22,7 @@ type WorkloadMetrics struct {
func (w *WorkloadMetrics) CreateWorkloadMetrics() error {
db := database.GetDB()
result := db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "workload_id"}, {Name: "container_name"}, {Name: "interval_start"}, {Name: "interval_end"}},
Columns: []clause.Column{{Name: "org_id"}, {Name: "workload_id"}, {Name: "container_name"}, {Name: "interval_start"}, {Name: "interval_end"}},
DoUpdates: clause.AssignmentColumns([]string{"usage_metrics"}),
}).Create(w)

Expand Down
5 changes: 4 additions & 1 deletion internal/services/report_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,14 @@ func ProcessReport(msg *kafka.Message) {

// Create workload entry into the table.
workload := model.Workload{
OrgId: rh_account.OrgId,
ClusterID: cluster.ID,
ExperimentName: experiment_name,
Namespace: namespace,
WorkloadType: w.WorkloadType(k8s_object_type),
WorkloadName: k8s_object_name,
Containers: container_names,
MetricsUploadAt: time.Now(),
MetricsUploadAt: maxEndTime,
}
if err := workload.CreateWorkload(); err != nil {
log.Errorf("unable to save workload record: %v. Error: %v", workload, err)
Expand Down Expand Up @@ -161,6 +162,7 @@ func ProcessReport(msg *kafka.Message) {
}

workload_metric := model.WorkloadMetrics{
OrgId: rh_account.OrgId,
WorkloadID: workload.ID,
ContainerName: container.Container_name,
IntervalStart: interval_start_time,
Expand Down Expand Up @@ -225,6 +227,7 @@ func ProcessReport(msg *kafka.Message) {

// Create entry into HistoricalRecommendationSet table.
historicalRecommendationSet := model.HistoricalRecommendationSet{
OrgId: rh_account.OrgId,
WorkloadID: workload.ID,
ContainerName: container.Container_name,
MonitoringStartTime: v.Duration_based.Short_term.Monitoring_start_time,
Expand Down
3 changes: 2 additions & 1 deletion migrations/000003_create_workloads_table.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TYPE workloadtype AS ENUM ('deployment', 'deploymentconfig', 'replicaset'

CREATE TABLE IF NOT EXISTS workloads(
id BIGSERIAL PRIMARY KEY,
org_id TEXT NOT NULL,
cluster_id BIGINT NOT NULL,
experiment_name TEXT NOT NULL,
namespace TEXT NOT NULL,
Expand All @@ -18,4 +19,4 @@ ON DELETE CASCADE;
CREATE INDEX idx_workloads_containers ON workloads USING gin(containers);

ALTER TABLE workloads
ADD UNIQUE (cluster_id, experiment_name);
ADD UNIQUE (org_id, cluster_id, experiment_name);
3 changes: 3 additions & 0 deletions migrations/000004_create_recommendation_sets_table.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ CREATE TABLE IF NOT EXISTS recommendation_sets(
ALTER TABLE recommendation_sets
ADD CONSTRAINT fk_recommendation_sets_workload FOREIGN KEY (workload_id) REFERENCES workloads (id)
ON DELETE CASCADE;

ALTER TABLE recommendation_sets
ADD CONSTRAINT UQ_Recommendation UNIQUE (workload_id, container_name);
2 changes: 0 additions & 2 deletions migrations/000005_add_unique_to_recommendation_set.down.sql

This file was deleted.

2 changes: 0 additions & 2 deletions migrations/000005_add_unique_to_recommendation_set.up.sql

This file was deleted.

2 changes: 2 additions & 0 deletions migrations/000005_partition_functions.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP FUNCTION IF EXISTS create_monthly_patitions;
DROP FUNCTION IF EXISTS create_range_patition;
58 changes: 58 additions & 0 deletions migrations/000005_partition_functions.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
CREATE OR REPLACE FUNCTION create_range_patition(partition_table_name TEXT, parent_table Text, partition_start_date Text, partition_end_date Text)
RETURNS void AS
$BODY$
DECLARE
BEGIN
EXECUTE 'CREATE TABLE ' || partition_table_name
|| ' PARTITION OF '|| parent_table ||' FOR VALUES FROM '
|| '(''' || partition_start_date || ''')'
|| ' TO '
|| '(''' || partition_end_date || ''')';
END;
$BODY$
LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION create_monthly_patitions(max_interval_end TIMESTAMP WITH TIME ZONE, parent_table Text)
RETURNS void AS
$BODY$
DECLARE
record_day INT;
record_date TEXT;
partition_start_date TEXT;
partition_end_date TEXT;
partition_table_name TEXT;
BEGIN
record_day := TO_NUMBER(TO_CHAR(max_interval_end,'DD'),'99');
record_date := TO_CHAR(max_interval_end,'YYYY-MM-');
IF record_day > 15 THEN
partition_start_date = CONCAT(record_date, '16');
select (date_trunc('month', max_interval_end) + interval '1 month - 1 day')::date INTO partition_end_date;
partition_table_name = replace(parent_table || '_' || partition_start_date, '-', '_');
IF NOT EXISTS(SELECT relname FROM pg_class WHERE relname=partition_table_name) THEN
EXECUTE create_range_patition(partition_table_name, parent_table, partition_start_date, partition_end_date);
END IF;

partition_start_date = CONCAT(record_date, '1');
partition_end_date = record_date || '16';
partition_table_name = replace(parent_table || '_' || partition_start_date, '-', '_');
IF NOT EXISTS(SELECT relname FROM pg_class WHERE relname=partition_table_name) THEN
EXECUTE create_range_patition(partition_table_name, parent_table, partition_start_date, partition_end_date);
END IF;
ELSE
partition_start_date = CONCAT(record_date, '1');
partition_end_date = record_date || '16';
partition_table_name = replace(parent_table || '_' || partition_start_date, '-', '_');
IF NOT EXISTS(SELECT relname FROM pg_class WHERE relname=partition_table_name) THEN
EXECUTE create_range_patition(partition_table_name, parent_table, partition_start_date, partition_end_date);
END IF;

select (date_trunc('month', max_interval_end) - interval '1 month' + interval '15 days' )::date INTO partition_start_date;
select (date_trunc('month', max_interval_end))::date INTO partition_end_date;
partition_table_name = replace(parent_table || '_' || partition_start_date, '-', '_');
IF NOT EXISTS(SELECT relname FROM pg_class WHERE relname=partition_table_name) THEN
EXECUTE create_range_patition(partition_table_name, parent_table, partition_start_date, partition_end_date);
END IF;
END IF;
END;
$BODY$
LANGUAGE plpgsql;
2 changes: 1 addition & 1 deletion migrations/000006_create_workload_metrics_table.down.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DROP TABLE IF EXISTS workload_metrics;
DROP TABLE IF EXISTS workload_metrics CASCADE;
28 changes: 25 additions & 3 deletions migrations/000006_create_workload_metrics_table.up.sql
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
CREATE TABLE IF NOT EXISTS workload_metrics(
id BIGSERIAL PRIMARY KEY,
id BIGSERIAL NOT NULL,
org_id TEXT NOT NULL,
workload_id BIGINT NOT NULL,
container_name TEXT NOT NULL,
interval_start TIMESTAMP WITH TIME ZONE NOT NULL,
interval_end TIMESTAMP WITH TIME ZONE NOT NULL,
usage_metrics jsonb NOT NULL
);
) PARTITION BY LIST(org_id);

ALTER TABLE workload_metrics
ADD CONSTRAINT fk_workload_metrics_workload FOREIGN KEY (workload_id) REFERENCES workloads (id)
ON DELETE CASCADE;

ALTER TABLE workload_metrics
ADD CONSTRAINT UQ_Workload_Metrics UNIQUE (workload_id, container_name, interval_start, interval_end);
ADD CONSTRAINT UQ_Workload_Metrics UNIQUE (org_id, workload_id, container_name, interval_start, interval_end);


CREATE OR REPLACE FUNCTION workload_metrics_insert_trigger_func() RETURNS trigger AS
$BODY$
DECLARE
org_id_partition_table_name TEXT;
BEGIN
org_id_partition_table_name := 'workload_metrics_' || New.org_id;
IF NOT EXISTS(SELECT relname FROM pg_class WHERE relname=org_id_partition_table_name) THEN
EXECUTE 'CREATE TABLE ' || org_id_partition_table_name
|| ' PARTITION OF workload_metrics FOR VALUES IN'
|| ' (''' || NEW.org_id || ''')'
|| ' PARTITION BY RANGE(interval_end)';
END IF;
EXECUTE create_monthly_patitions(NEW.metrics_upload_at, org_id_partition_table_name);
return NEW;
END;
$BODY$
LANGUAGE plpgsql;

CREATE TRIGGER workload_metrics_insert_trigger BEFORE INSERT ON workloads FOR EACH ROW EXECUTE PROCEDURE workload_metrics_insert_trigger_func();
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DROP TABLE IF EXISTS historical_recommendation_sets;
DROP TABLE IF EXISTS historical_recommendation_sets CASCADE;
43 changes: 31 additions & 12 deletions migrations/000007_create_historical_recommendation_sets.up.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
CREATE TABLE IF NOT EXISTS historical_recommendation_sets AS TABLE recommendation_sets;

ALTER TABLE historical_recommendation_sets
ADD PRIMARY KEY (id),
ALTER COLUMN id SET DEFAULT gen_random_uuid(),
ALTER COLUMN id SET NOT NULL,
ALTER COLUMN container_name SET NOT NULL,
ALTER COLUMN monitoring_start_time SET NOT NULL,
ALTER COLUMN monitoring_end_time SET NOT NULL,
ALTER COLUMN recommendations SET NOT NULL,
ALTER COLUMN updated_at SET NOT NULL;
CREATE TABLE IF NOT EXISTS historical_recommendation_sets(
id BIGSERIAL NOT NULL,
org_id TEXT NOT NULL,
workload_id BIGINT NOT NULL,
container_name TEXT NOT NULL,
monitoring_start_time TIMESTAMP WITH TIME ZONE NOT NULL,
monitoring_end_time TIMESTAMP WITH TIME ZONE NOT NULL,
recommendations jsonb NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
) PARTITION BY LIST(org_id);

ALTER TABLE historical_recommendation_sets
ADD CONSTRAINT fk_historical_recommendation_sets_workload FOREIGN KEY (workload_id) REFERENCES workloads (id)
ON DELETE CASCADE;

ALTER TABLE historical_recommendation_sets
ADD CONSTRAINT UQ_historical_recommendation UNIQUE (workload_id, container_name, monitoring_end_time);
ADD CONSTRAINT UQ_historical_recommendation UNIQUE (org_id, workload_id, container_name, monitoring_end_time);

CREATE OR REPLACE FUNCTION historical_recommendation_sets_insert_trigger_func() RETURNS trigger AS
$BODY$
DECLARE
org_id_partition_table_name TEXT;
BEGIN
org_id_partition_table_name := 'historical_recommendation_sets_' || New.org_id;
IF NOT EXISTS(SELECT relname FROM pg_class WHERE relname=org_id_partition_table_name) THEN
EXECUTE 'CREATE TABLE ' || org_id_partition_table_name
|| ' PARTITION OF historical_recommendation_sets FOR VALUES IN'
|| ' (''' || NEW.org_id || ''')'
|| ' PARTITION BY RANGE(monitoring_end_time)';
END IF;
EXECUTE create_monthly_patitions(NEW.metrics_upload_at, org_id_partition_table_name);
return NEW;
END;
$BODY$
LANGUAGE plpgsql;

CREATE TRIGGER historical_recommendation_sets_insert_trigger BEFORE INSERT ON workloads FOR EACH ROW EXECUTE PROCEDURE historical_recommendation_sets_insert_trigger_func();
Empty file.
4 changes: 0 additions & 4 deletions migrations/000008_keep_last_reported.up.sql

This file was deleted.

3 changes: 0 additions & 3 deletions migrations/000009_modify_recommendation_sets_table.down.sql

This file was deleted.

3 changes: 0 additions & 3 deletions migrations/000009_modify_recommendation_sets_table.up.sql

This file was deleted.

0 comments on commit d6f0632

Please sign in to comment.