diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index b7fd02383d..765349bee0 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1374,6 +1374,105 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil } +func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { + // uuid is used as transaction id + uuidWithHyphen := uuid.New() + uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) + tableNames := []string{ + r.TraceDB + "." + r.traceTableName, + r.TraceDB + "." + r.traceResourceTableV3, + r.TraceDB + "." + signozErrorIndexTable, + r.TraceDB + "." + signozUsageExplorerTable, + r.TraceDB + "." + defaultDependencyGraphTable, + r.TraceDB + "." + r.traceSummaryTable, + } + + coldStorageDuration := -1 + if len(params.ColdStorageVolume) > 0 { + coldStorageDuration = int(params.ToColdStorageDuration) + } + + // check if there is existing things to be done + for _, tableName := range tableNames { + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} + } + if statusItem.Status == constants.StatusPending { + return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} + } + } + + // TTL query + ttlV2 := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(%s) + INTERVAL %v SECOND DELETE" + ttlV2ColdStorage := ", toDateTime(%s) + INTERVAL %v SECOND TO VOLUME '%s'" + + // TTL query for resource table + ttlV2Resource := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND DELETE" + ttlTracesV2ResourceColdStorage := ", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND TO VOLUME '%s'" + + for _, distributedTableName := range tableNames { + go func(distributedTableName string) { + tableName := getLocalTableName(distributedTableName) + + // for trace summary table, we need to use end instead of timestamp + timestamp := "timestamp" + if strings.HasSuffix(distributedTableName, r.traceSummaryTable) { + timestamp = "end" + } + + _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + if dbErr != nil { + zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) + return + } + req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp, params.DelDuration) + if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) { + req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration) + } + + if len(params.ColdStorageVolume) > 0 { + if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) { + req += fmt.Sprintf(ttlTracesV2ResourceColdStorage, params.ToColdStorageDuration, params.ColdStorageVolume) + } else { + req += fmt.Sprintf(ttlV2ColdStorage, timestamp, params.ToColdStorageDuration, params.ColdStorageVolume) + } + } + err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) + if err != nil { + zap.L().Error("Error in setting cold storage", zap.Error(err)) + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err == nil { + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + } + return + } + req += " SETTINGS materialize_ttl_after_modify=0;" + zap.L().Error(" ExecutingTTL request: ", zap.String("request", req)) + statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + if err := r.db.Exec(ctx, req); err != nil { + zap.L().Error("Error in executing set TTL query", zap.Error(err)) + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + return + } + _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + }(distributedTableName) + } + return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil +} + // SetTTL sets the TTL for traces or metrics or logs tables. // This is an async API which creates goroutines to set TTL. // Status of TTL update is tracked with ttl_status table in sqlite db. @@ -1395,6 +1494,10 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, switch params.Type { case constants.TraceTTL: + if r.useTraceNewSchema { + return r.SetTTLTracesV2(ctx, params) + } + tableNames := []string{ signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, @@ -1755,7 +1858,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) { var dbResp []model.DBResponseTTL - query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", signozTraceLocalTableName, signozTraceDBName) + query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.traceLocalTableName, signozTraceDBName) err := r.db.Select(ctx, &dbResp, query)