From 2281fa27afcc9648205d7b056b0878483660f782 Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Thu, 21 Nov 2024 15:57:50 +0530 Subject: [PATCH 1/3] feat: tt api for new trace tables --- .../app/clickhouseReader/reader.go | 106 +++++++++++++++++- 1 file changed, 105 insertions(+), 1 deletion(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index b7fd02383d..932a7dce2b 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1374,6 +1374,106 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil } +func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { + // uuid is used as transaction id + uuidWithHyphen := uuid.New() + uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) + tableNames := []string{ + r.TraceDB + "." + r.traceTableName, + r.TraceDB + "." + r.traceResourceTableV3, + r.TraceDB + "." + signozErrorIndexTable, + r.TraceDB + "." + signozUsageExplorerTable, + r.TraceDB + "." + defaultDependencyGraphTable, + r.TraceDB + "." + r.traceSummaryTable, + } + + coldStorageDuration := -1 + if len(params.ColdStorageVolume) > 0 { + coldStorageDuration = int(params.ToColdStorageDuration) + } + + // check if there is existing things to be done + for _, tableName := range tableNames { + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} + } + if statusItem.Status == constants.StatusPending { + return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} + } + } + + // TTL query + ttlV2 := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(%s) + INTERVAL %v SECOND DELETE" + ttlV2ColdStorage := ", toDateTime(%s) + INTERVAL %v SECOND TO VOLUME '%s'" + + // TTL query for resource table + ttlV2Resource := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND DELETE" + ttlLogsV2ResourceColdStorage := ", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND TO VOLUME '%s'" + + for _, distributedTableName := range tableNames { + go func(distributedTableName string) { + tableName := getLocalTableName(distributedTableName) + + // for trace summary table, we need to use end instead of timestamp + timestamp := "timestamp" + if strings.HasSuffix(distributedTableName, r.traceSummaryTable) { + timestamp = "end" + } + + _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + if dbErr != nil { + zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) + return + } + req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp, params.DelDuration) + if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) { + req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration) + } + + if len(params.ColdStorageVolume) > 0 { + if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) { + req += fmt.Sprintf(ttlLogsV2ResourceColdStorage, params.ToColdStorageDuration, params.ColdStorageVolume) + } else { + req += fmt.Sprintf(ttlV2ColdStorage, timestamp, params.ToColdStorageDuration, params.ColdStorageVolume) + } + } + err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) + if err != nil { + zap.L().Error("Error in setting cold storage", zap.Error(err)) + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err == nil { + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + } + return + } + req += " SETTINGS materialize_ttl_after_modify=0;" + zap.L().Error(" ExecutingTTL request: ", zap.String("request", req)) + statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + fmt.Println(req) + if err := r.db.Exec(context.Background(), req); err != nil { + zap.L().Error("Error in executing set TTL query", zap.Error(err)) + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + return + } + _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + }(distributedTableName) + } + return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil +} + // SetTTL sets the TTL for traces or metrics or logs tables. // This is an async API which creates goroutines to set TTL. // Status of TTL update is tracked with ttl_status table in sqlite db. @@ -1395,6 +1495,10 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, switch params.Type { case constants.TraceTTL: + if r.useTraceNewSchema { + return r.SetTTLTracesV2(ctx, params) + } + tableNames := []string{ signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, @@ -1755,7 +1859,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) { var dbResp []model.DBResponseTTL - query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", signozTraceLocalTableName, signozTraceDBName) + query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.traceLocalTableName, signozTraceDBName) err := r.db.Select(ctx, &dbResp, query) From b5e31e89fd3275cd674a0d9c07b87d5c41f77c8b Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Thu, 21 Nov 2024 16:02:21 +0530 Subject: [PATCH 2/3] fix: remove print and use correct context --- pkg/query-service/app/clickhouseReader/reader.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 932a7dce2b..b3910c9826 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1454,8 +1454,7 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL req += " SETTINGS materialize_ttl_after_modify=0;" zap.L().Error(" ExecutingTTL request: ", zap.String("request", req)) statusItem, _ := r.checkTTLStatusItem(ctx, tableName) - fmt.Println(req) - if err := r.db.Exec(context.Background(), req); err != nil { + if err := r.db.Exec(ctx, req); err != nil { zap.L().Error("Error in executing set TTL query", zap.Error(err)) _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) if dbErr != nil { From bc97ac89d690648cdec5cfe8b487f5072ff1099a Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Thu, 21 Nov 2024 16:09:14 +0530 Subject: [PATCH 3/3] fix: update var name --- pkg/query-service/app/clickhouseReader/reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index b3910c9826..765349bee0 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1409,7 +1409,7 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL // TTL query for resource table ttlV2Resource := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND DELETE" - ttlLogsV2ResourceColdStorage := ", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND TO VOLUME '%s'" + ttlTracesV2ResourceColdStorage := ", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND TO VOLUME '%s'" for _, distributedTableName := range tableNames { go func(distributedTableName string) { @@ -1433,7 +1433,7 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL if len(params.ColdStorageVolume) > 0 { if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) { - req += fmt.Sprintf(ttlLogsV2ResourceColdStorage, params.ToColdStorageDuration, params.ColdStorageVolume) + req += fmt.Sprintf(ttlTracesV2ResourceColdStorage, params.ToColdStorageDuration, params.ColdStorageVolume) } else { req += fmt.Sprintf(ttlV2ColdStorage, timestamp, params.ToColdStorageDuration, params.ColdStorageVolume) }