diff --git a/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml b/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml index 6ddf49562a..6d1c7267b1 100644 --- a/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml +++ b/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml @@ -146,7 +146,7 @@ services: condition: on-failure query-service: - image: signoz/query-service:0.58.1 + image: signoz/query-service:0.58.2 command: [ "-config=/root/config/prometheus.yml", @@ -186,7 +186,7 @@ services: <<: *db-depend frontend: - image: signoz/frontend:0.58.1 + image: signoz/frontend:0.58.2 deploy: restart_policy: condition: on-failure diff --git a/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml b/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml index 2a38538d8a..bda2156a1c 100644 --- a/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose-minimal.yaml @@ -162,7 +162,7 @@ services: # Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md` query-service: - image: signoz/query-service:${DOCKER_TAG:-0.58.1} + image: signoz/query-service:${DOCKER_TAG:-0.58.2} container_name: signoz-query-service command: [ @@ -201,7 +201,7 @@ services: <<: *db-depend frontend: - image: signoz/frontend:${DOCKER_TAG:-0.58.1} + image: signoz/frontend:${DOCKER_TAG:-0.58.2} container_name: signoz-frontend restart: on-failure depends_on: diff --git a/deploy/docker/clickhouse-setup/docker-compose.testing.yaml b/deploy/docker/clickhouse-setup/docker-compose.testing.yaml index 3cf50bbd3c..de0c6cb0e7 100644 --- a/deploy/docker/clickhouse-setup/docker-compose.testing.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose.testing.yaml @@ -167,7 +167,7 @@ services: # Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md` query-service: - image: signoz/query-service:${DOCKER_TAG:-0.58.1} + image: signoz/query-service:${DOCKER_TAG:-0.58.2} container_name: signoz-query-service command: [ @@ -208,7 +208,7 @@ services: <<: *db-depend frontend: - image: signoz/frontend:${DOCKER_TAG:-0.58.1} + image: signoz/frontend:${DOCKER_TAG:-0.58.2} container_name: signoz-frontend restart: on-failure depends_on: diff --git a/ee/query-service/Dockerfile b/ee/query-service/Dockerfile index 55ed33aa60..5c8f2f6f35 100644 --- a/ee/query-service/Dockerfile +++ b/ee/query-service/Dockerfile @@ -1,5 +1,5 @@ # use a minimal alpine image -FROM alpine:3.18.6 +FROM alpine:3.20.3 # Add Maintainer Info LABEL maintainer="signoz" diff --git a/ee/query-service/license/manager.go b/ee/query-service/license/manager.go index 6dcc704e3a..aa58e7e5a5 100644 --- a/ee/query-service/license/manager.go +++ b/ee/query-service/license/manager.go @@ -67,6 +67,30 @@ func StartManager(dbType string, db *sqlx.DB, useLicensesV3 bool, features ...ba repo: &repo, } + if useLicensesV3 { + // get active license from the db + active, err := m.repo.GetActiveLicense(context.Background()) + if err != nil { + return m, err + } + + // if we have an active license then need to fetch the complete details + if active != nil { + // fetch the new license structure from control plane + licenseV3, apiError := validate.ValidateLicenseV3(active.Key) + if apiError != nil { + return m, apiError + } + + // insert the licenseV3 in sqlite db + apiError = m.repo.InsertLicenseV3(context.Background(), licenseV3) + // if the license already exists move ahead. + if apiError != nil && apiError.Typ != model.ErrorConflict { + return m, apiError + } + } + } + if err := m.start(useLicensesV3, features...); err != nil { return m, err } diff --git a/ee/query-service/rules/anomaly.go b/ee/query-service/rules/anomaly.go index 08ff3afcda..ceec23747e 100644 --- a/ee/query-service/rules/anomaly.go +++ b/ee/query-service/rules/anomaly.go @@ -61,6 +61,11 @@ func NewAnomalyRule( zap.L().Info("creating new AnomalyRule", zap.String("id", id), zap.Any("opts", opts)) + if p.RuleCondition.CompareOp == baserules.ValueIsBelow { + target := -1 * *p.RuleCondition.Target + p.RuleCondition.Target = &target + } + baseRule, err := baserules.NewBaseRule(id, p, reader, opts...) if err != nil { return nil, err diff --git a/frontend/src/container/FormAlertRules/BasicInfo.tsx b/frontend/src/container/FormAlertRules/BasicInfo.tsx index 6e686d1188..9f04a07924 100644 --- a/frontend/src/container/FormAlertRules/BasicInfo.tsx +++ b/frontend/src/container/FormAlertRules/BasicInfo.tsx @@ -8,7 +8,7 @@ import { ALERTS_DATA_SOURCE_MAP } from 'constants/alerts'; import ROUTES from 'constants/routes'; import useComponentPermission from 'hooks/useComponentPermission'; import useFetch from 'hooks/useFetch'; -import { useCallback, useEffect, useState } from 'react'; +import { useCallback, useEffect, useRef, useState } from 'react'; import { useTranslation } from 'react-i18next'; import { useSelector } from 'react-redux'; import { AppState } from 'store/reducers'; @@ -83,16 +83,22 @@ function BasicInfo({ window.open(ROUTES.CHANNELS_NEW, '_blank'); // eslint-disable-next-line react-hooks/exhaustive-deps }, []); + const hasLoggedEvent = useRef(false); useEffect(() => { - if (!channels.loading && isNewRule) { + if (!channels.loading && isNewRule && !hasLoggedEvent.current) { logEvent('Alert: New alert creation page visited', { dataSource: ALERTS_DATA_SOURCE_MAP[alertDef?.alertType as AlertTypes], numberOfChannels: channels?.payload?.length, }); + hasLoggedEvent.current = true; } // eslint-disable-next-line react-hooks/exhaustive-deps - }, [channels.payload, channels.loading]); + }, [channels.loading]); + + const refetchChannels = async (): Promise => { + await channels.refetch(); + }; return ( <> @@ -197,7 +203,7 @@ function BasicInfo({ {!shouldBroadCastToAllChannels && ( { setAlertDef({ diff --git a/frontend/src/container/FormAlertRules/ChannelSelect/index.tsx b/frontend/src/container/FormAlertRules/ChannelSelect/index.tsx index 209369c229..86c717396d 100644 --- a/frontend/src/container/FormAlertRules/ChannelSelect/index.tsx +++ b/frontend/src/container/FormAlertRules/ChannelSelect/index.tsx @@ -1,24 +1,33 @@ -import { Select } from 'antd'; +import { PlusOutlined } from '@ant-design/icons'; +import { Select, Spin } from 'antd'; +import useComponentPermission from 'hooks/useComponentPermission'; import { State } from 'hooks/useFetch'; import { useNotifications } from 'hooks/useNotifications'; import { ReactNode } from 'react'; import { useTranslation } from 'react-i18next'; +import { useSelector } from 'react-redux'; +import { AppState } from 'store/reducers'; import { PayloadProps } from 'types/api/channels/getAll'; +import AppReducer from 'types/reducer/app'; -import { StyledSelect } from './styles'; +import { StyledCreateChannelOption, StyledSelect } from './styles'; export interface ChannelSelectProps { disabled?: boolean; currentValue?: string[]; onSelectChannels: (s: string[]) => void; + onDropdownOpen: () => void; channels: State; + handleCreateNewChannels: () => void; } function ChannelSelect({ disabled, currentValue, onSelectChannels, + onDropdownOpen, channels, + handleCreateNewChannels, }: ChannelSelectProps): JSX.Element | null { // init namespace for translations const { t } = useTranslation('alerts'); @@ -26,6 +35,10 @@ function ChannelSelect({ const { notifications } = useNotifications(); const handleChange = (value: string[]): void => { + if (value.includes('add-new-channel')) { + handleCreateNewChannels(); + return; + } onSelectChannels(value); }; @@ -35,9 +48,27 @@ function ChannelSelect({ description: channels.errorMessage, }); } + + const { role } = useSelector((state) => state.app); + const [addNewChannelPermission] = useComponentPermission( + ['add_new_channel'], + role, + ); + const renderOptions = (): ReactNode[] => { const children: ReactNode[] = []; + if (!channels.loading && addNewChannelPermission) { + children.push( + + + + Create a new channel + + , + ); + } + if ( channels.loading || channels.payload === undefined || @@ -56,6 +87,7 @@ function ChannelSelect({ return children; }; + return ( } + onDropdownVisibleChange={(open): void => { + if (open) { + onDropdownOpen(); + } + }} onChange={(value): void => { handleChange(value as string[]); }} diff --git a/frontend/src/container/FormAlertRules/ChannelSelect/styles.ts b/frontend/src/container/FormAlertRules/ChannelSelect/styles.ts index 7a59e38767..33d4376ba5 100644 --- a/frontend/src/container/FormAlertRules/ChannelSelect/styles.ts +++ b/frontend/src/container/FormAlertRules/ChannelSelect/styles.ts @@ -4,3 +4,10 @@ import styled from 'styled-components'; export const StyledSelect = styled(Select)` border-radius: 4px; `; + +export const StyledCreateChannelOption = styled.div` + color: var(--bg-robin-500); + display: flex; + align-items: center; + gap: 8px; +`; diff --git a/frontend/src/container/FormAlertRules/RuleOptions.tsx b/frontend/src/container/FormAlertRules/RuleOptions.tsx index e9aa8f860f..8eebb8268d 100644 --- a/frontend/src/container/FormAlertRules/RuleOptions.tsx +++ b/frontend/src/container/FormAlertRules/RuleOptions.tsx @@ -102,9 +102,9 @@ function RuleOptions({ {t('option_notequal')} )} - + {/* the value 5 and 6 are reserved for above or equal and below or equal */} {ruleType === 'anomaly_rule' && ( - {t('option_above_below')} + {t('option_above_below')} )} ); diff --git a/frontend/src/container/MetricsApplication/constant.ts b/frontend/src/container/MetricsApplication/constant.ts index 75853cc8ea..1052a8d09a 100644 --- a/frontend/src/container/MetricsApplication/constant.ts +++ b/frontend/src/container/MetricsApplication/constant.ts @@ -38,11 +38,12 @@ export enum FORMULA { // query C => durationNano <= 2000ms // Since <= 2000ms includes <= 500ms, we over count, to correct we subtract B/2 // so the full expression would be (B + C/2) - B/2 = (B+C)/2 + // However, if you add a filter on durationNano > 500ms, (filterItemC in overviewQueries) the query would be + // B + C/2 APDEX_TRACES = '((B + C)/2)/A', - // Does the same not apply for delta span metrics? - // No, because the delta metrics store the counts just for the current bucket - // so we don't need to subtract anything - APDEX_DELTA_SPAN_METRICS = '(B + C)/A', + // The delta span metrics store delta compared to previous reporting interval + // but not the counts for the current interval. The bucket counts are cumulative + APDEX_DELTA_SPAN_METRICS = '((B + C)/2)/A', // Cumulative span metrics store the counts for all buckets // so we need to subtract B/2 to correct the over counting APDEX_CUMULATIVE_SPAN_METRICS = '((B + C)/2)/A', diff --git a/frontend/src/hooks/useFetch.ts b/frontend/src/hooks/useFetch.ts index 7d67ff2a34..56377799d4 100644 --- a/frontend/src/hooks/useFetch.ts +++ b/frontend/src/hooks/useFetch.ts @@ -1,4 +1,4 @@ -import { useEffect, useRef, useState } from 'react'; +import { useCallback, useEffect, useState } from 'react'; import { ErrorResponse, SuccessResponse } from 'types/api'; function useFetch( @@ -10,7 +10,7 @@ function useFetch( (arg0: any): Promise | ErrorResponse>; }, param?: FunctionParams, -): State { +): State & { refetch: () => Promise } { const [state, setStates] = useState>({ loading: true, success: null, @@ -19,37 +19,28 @@ function useFetch( payload: undefined, }); - const loadingRef = useRef(0); - - useEffect(() => { + const fetchData = useCallback(async (): Promise => { + setStates((prev) => ({ ...prev, loading: true })); try { - (async (): Promise => { - if (state.loading) { - const response = await functions(param); - - if (loadingRef.current === 0) { - loadingRef.current = 1; + const response = await functions(param); - if (response.statusCode === 200) { - setStates({ - loading: false, - error: false, - success: true, - payload: response.payload, - errorMessage: '', - }); - } else { - setStates({ - loading: false, - error: true, - success: false, - payload: undefined, - errorMessage: response.error as string, - }); - } - } - } - })(); + if (response.statusCode === 200) { + setStates({ + loading: false, + error: false, + success: true, + payload: response.payload, + errorMessage: '', + }); + } else { + setStates({ + loading: false, + error: true, + success: false, + payload: undefined, + errorMessage: response.error as string, + }); + } } catch (error) { setStates({ payload: undefined, @@ -59,13 +50,16 @@ function useFetch( errorMessage: error as string, }); } - return (): void => { - loadingRef.current = 1; - }; - }, [functions, param, state.loading]); + }, [functions, param]); + + // Initial fetch + useEffect(() => { + fetchData(); + }, [fetchData]); return { ...state, + refetch: fetchData, }; } diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index a6811dc2eb..7e57e5ccaf 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -766,307 +766,6 @@ func buildFilterArrayQuery(_ context.Context, excludeMap map[string]struct{}, pa return args } -func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) { - - var query string - excludeMap := make(map[string]struct{}) - for _, e := range queryParams.Exclude { - if e == constants.OperationRequest { - excludeMap[constants.OperationDB] = struct{}{} - continue - } - excludeMap[e] = struct{}{} - } - - args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} - if len(queryParams.TraceID) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args) - } - if len(queryParams.ServiceName) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args) - } - if len(queryParams.HttpRoute) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args) - } - if len(queryParams.HttpHost) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args) - } - if len(queryParams.HttpMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args) - } - if len(queryParams.HttpUrl) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args) - } - if len(queryParams.Operation) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args) - } - if len(queryParams.RPCMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args) - } - if len(queryParams.ResponseStatusCode) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args) - } - - if len(queryParams.MinDuration) != 0 { - query = query + " AND durationNano >= @durationNanoMin" - args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration)) - } - if len(queryParams.MaxDuration) != 0 { - query = query + " AND durationNano <= @durationNanoMax" - args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration)) - } - - if len(queryParams.SpanKind) != 0 { - query = query + " AND kind = @kind" - args = append(args, clickhouse.Named("kind", queryParams.SpanKind)) - } - - query = getStatusFilters(query, queryParams.Status, excludeMap) - - traceFilterReponse := model.SpanFiltersResponse{ - Status: map[string]uint64{}, - Duration: map[string]uint64{}, - ServiceName: map[string]uint64{}, - Operation: map[string]uint64{}, - ResponseStatusCode: map[string]uint64{}, - RPCMethod: map[string]uint64{}, - HttpMethod: map[string]uint64{}, - HttpUrl: map[string]uint64{}, - HttpRoute: map[string]uint64{}, - HttpHost: map[string]uint64{}, - } - - for _, e := range queryParams.GetFilters { - switch e { - case constants.TraceID: - continue - case constants.ServiceName: - finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY serviceName" - var dBResponse []model.DBResponseServiceName - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.ServiceName != "" { - traceFilterReponse.ServiceName[service.ServiceName] = service.Count - } - } - case constants.HttpRoute: - finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY httpRoute" - var dBResponse []model.DBResponseHttpRoute - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.HttpRoute != "" { - traceFilterReponse.HttpRoute[service.HttpRoute] = service.Count - } - } - case constants.HttpUrl: - finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY httpUrl" - var dBResponse []model.DBResponseHttpUrl - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.HttpUrl != "" { - traceFilterReponse.HttpUrl[service.HttpUrl] = service.Count - } - } - case constants.HttpMethod: - finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY httpMethod" - var dBResponse []model.DBResponseHttpMethod - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.HttpMethod != "" { - traceFilterReponse.HttpMethod[service.HttpMethod] = service.Count - } - } - case constants.HttpHost: - finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY httpHost" - var dBResponse []model.DBResponseHttpHost - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.HttpHost != "" { - traceFilterReponse.HttpHost[service.HttpHost] = service.Count - } - } - case constants.OperationRequest: - finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY name" - var dBResponse []model.DBResponseOperation - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.Operation != "" { - traceFilterReponse.Operation[service.Operation] = service.Count - } - } - case constants.Status: - finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.indexTable) - finalQuery += query - var dBResponse []model.DBResponseTotal - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - - finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.indexTable) - finalQuery2 += query - var dBResponse2 []model.DBResponseTotal - err = r.db.Select(ctx, &dBResponse2, finalQuery2, args...) - zap.L().Info(finalQuery2) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - if len(dBResponse) > 0 && len(dBResponse2) > 0 { - traceFilterReponse.Status = map[string]uint64{"ok": dBResponse2[0].NumTotal, "error": dBResponse[0].NumTotal} - } else if len(dBResponse) > 0 { - traceFilterReponse.Status = map[string]uint64{"ok": 0, "error": dBResponse[0].NumTotal} - } else if len(dBResponse2) > 0 { - traceFilterReponse.Status = map[string]uint64{"ok": dBResponse2[0].NumTotal, "error": 0} - } else { - traceFilterReponse.Status = map[string]uint64{"ok": 0, "error": 0} - } - case constants.Duration: - err := r.featureFlags.CheckFeature(constants.DurationSort) - durationSortEnabled := err == nil - finalQuery := "" - if !durationSortEnabled { - // if duration sort is not enabled, we need to get the min and max duration from the index table - finalQuery = fmt.Sprintf("SELECT min(durationNano) as min, max(durationNano) as max FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - var dBResponse []model.DBResponseMinMax - err = r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - if len(dBResponse) > 0 { - traceFilterReponse.Duration = map[string]uint64{"minDuration": dBResponse[0].Min, "maxDuration": dBResponse[0].Max} - } - } else { - // when duration sort is enabled, we need to get the min and max duration from the duration table - finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable) - finalQuery += query - finalQuery += " ORDER BY durationNano LIMIT 1" - var dBResponse []model.DBResponseTotal - err = r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - - finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable) - finalQuery += query - finalQuery += " ORDER BY durationNano DESC LIMIT 1" - var dBResponse2 []model.DBResponseTotal - err = r.db.Select(ctx, &dBResponse2, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - if len(dBResponse) > 0 { - traceFilterReponse.Duration["minDuration"] = dBResponse[0].NumTotal - } - if len(dBResponse2) > 0 { - traceFilterReponse.Duration["maxDuration"] = dBResponse2[0].NumTotal - } - } - case constants.RPCMethod: - finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY rpcMethod" - var dBResponse []model.DBResponseRPCMethod - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.RPCMethod != "" { - traceFilterReponse.RPCMethod[service.RPCMethod] = service.Count - } - } - - case constants.ResponseStatusCode: - finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY responseStatusCode" - var dBResponse []model.DBResponseStatusCodeMethod - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.ResponseStatusCode != "" { - traceFilterReponse.ResponseStatusCode[service.ResponseStatusCode] = service.Count - } - } - - default: - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("filter type: %s not supported", e)} - } - } - - return &traceFilterReponse, nil -} - func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string { // status can only be two and if both are selected than they are equivalent to none selected @@ -1088,140 +787,6 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string return query } -func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) { - - queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.indexTable) - - excludeMap := make(map[string]struct{}) - for _, e := range queryParams.Exclude { - if e == constants.OperationRequest { - excludeMap[constants.OperationDB] = struct{}{} - continue - } - excludeMap[e] = struct{}{} - } - - var query string - args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} - if len(queryParams.TraceID) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args) - } - if len(queryParams.ServiceName) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args) - } - if len(queryParams.HttpRoute) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args) - } - if len(queryParams.HttpHost) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args) - } - if len(queryParams.HttpMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args) - } - if len(queryParams.HttpUrl) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args) - } - if len(queryParams.Operation) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args) - } - if len(queryParams.RPCMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args) - } - - if len(queryParams.ResponseStatusCode) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args) - } - - if len(queryParams.MinDuration) != 0 { - query = query + " AND durationNano >= @durationNanoMin" - args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration)) - } - if len(queryParams.MaxDuration) != 0 { - query = query + " AND durationNano <= @durationNanoMax" - args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration)) - } - query = getStatusFilters(query, queryParams.Status, excludeMap) - - if len(queryParams.SpanKind) != 0 { - query = query + " AND kind = @kind" - args = append(args, clickhouse.Named("kind", queryParams.SpanKind)) - } - - // create TagQuery from TagQueryParams - tags := createTagQueryFromTagQueryParams(queryParams.Tags) - subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) - query += subQuery - args = append(args, argsSubQuery...) - if errStatus != nil { - return nil, errStatus - } - - if len(queryParams.OrderParam) != 0 { - if queryParams.OrderParam == constants.Duration { - queryTable = fmt.Sprintf("%s.%s", r.TraceDB, r.durationTable) - if queryParams.Order == constants.Descending { - query = query + " ORDER BY durationNano DESC" - } - if queryParams.Order == constants.Ascending { - query = query + " ORDER BY durationNano ASC" - } - } else if queryParams.OrderParam == constants.Timestamp { - projectionOptQuery := "SET allow_experimental_projection_optimization = 1" - err := r.db.Exec(ctx, projectionOptQuery) - - zap.L().Info(projectionOptQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - if queryParams.Order == constants.Descending { - query = query + " ORDER BY timestamp DESC" - } - if queryParams.Order == constants.Ascending { - query = query + " ORDER BY timestamp ASC" - } - } - } - if queryParams.Limit > 0 { - query = query + " LIMIT @limit" - args = append(args, clickhouse.Named("limit", queryParams.Limit)) - } - - if queryParams.Offset > 0 { - query = query + " OFFSET @offset" - args = append(args, clickhouse.Named("offset", queryParams.Offset)) - } - - var getFilterSpansResponseItems []model.GetFilterSpansResponseItem - - baseQuery := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, durationNano, httpMethod, rpcMethod, responseStatusCode FROM %s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryTable) - baseQuery += query - err := r.db.Select(ctx, &getFilterSpansResponseItems, baseQuery, args...) - // Fill status and method - for i, e := range getFilterSpansResponseItems { - if e.RPCMethod != "" { - getFilterSpansResponseItems[i].Method = e.RPCMethod - } else { - getFilterSpansResponseItems[i].Method = e.HttpMethod - } - } - - zap.L().Info(baseQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - - getFilterSpansResponse := model.GetFilterSpansResponse{ - Spans: getFilterSpansResponseItems, - TotalSpans: 1000, - } - - return &getFilterSpansResponse, nil -} - func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery { tags := []model.TagQuery{} for _, tag := range queryParams { @@ -1379,87 +944,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args } -func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagFilters, *model.ApiError) { - - excludeMap := make(map[string]struct{}) - for _, e := range queryParams.Exclude { - if e == constants.OperationRequest { - excludeMap[constants.OperationDB] = struct{}{} - continue - } - excludeMap[e] = struct{}{} - } - - var query string - args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} - if len(queryParams.TraceID) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args) - } - if len(queryParams.ServiceName) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args) - } - if len(queryParams.HttpRoute) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args) - } - if len(queryParams.HttpHost) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args) - } - if len(queryParams.HttpMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args) - } - if len(queryParams.HttpUrl) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args) - } - if len(queryParams.Operation) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args) - } - if len(queryParams.RPCMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args) - } - if len(queryParams.ResponseStatusCode) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args) - } - if len(queryParams.MinDuration) != 0 { - query = query + " AND durationNano >= @durationNanoMin" - args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration)) - } - if len(queryParams.MaxDuration) != 0 { - query = query + " AND durationNano <= @durationNanoMax" - args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration)) - } - if len(queryParams.SpanKind) != 0 { - query = query + " AND kind = @kind" - args = append(args, clickhouse.Named("kind", queryParams.SpanKind)) - } - - query = getStatusFilters(query, queryParams.Status, excludeMap) - - tagFilters := []model.TagFilters{} - - // Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) - finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) - finalQuery += query - err := r.db.Select(ctx, &tagFilters, finalQuery, args...) - - zap.L().Info(query) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - tagFiltersResult := model.TagFilters{ - StringTagKeys: make([]string, 0), - NumberTagKeys: make([]string, 0), - BoolTagKeys: make([]string, 0), - } - if len(tagFilters) != 0 { - tagFiltersResult.StringTagKeys = excludeTags(ctx, tagFilters[0].StringTagKeys) - tagFiltersResult.NumberTagKeys = excludeTags(ctx, tagFilters[0].NumberTagKeys) - tagFiltersResult.BoolTagKeys = excludeTags(ctx, tagFilters[0].BoolTagKeys) - } - return &tagFiltersResult, nil -} - func excludeTags(_ context.Context, tags []string) []string { excludedTagsMap := map[string]bool{ "http.code": true, @@ -1483,102 +967,6 @@ func excludeTags(_ context.Context, tags []string) []string { return newTags } -func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagValues, *model.ApiError) { - - if queryParams.TagKey.Type == model.TagTypeNumber { - return &model.TagValues{ - NumberTagValues: make([]float64, 0), - StringTagValues: make([]string, 0), - BoolTagValues: make([]bool, 0), - }, nil - } else if queryParams.TagKey.Type == model.TagTypeBool { - return &model.TagValues{ - NumberTagValues: make([]float64, 0), - StringTagValues: make([]string, 0), - BoolTagValues: []bool{true, false}, - }, nil - } - - excludeMap := make(map[string]struct{}) - for _, e := range queryParams.Exclude { - if e == constants.OperationRequest { - excludeMap[constants.OperationDB] = struct{}{} - continue - } - excludeMap[e] = struct{}{} - } - - var query string - args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} - if len(queryParams.TraceID) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args) - } - if len(queryParams.ServiceName) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args) - } - if len(queryParams.HttpRoute) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args) - } - if len(queryParams.HttpHost) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args) - } - if len(queryParams.HttpMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args) - } - if len(queryParams.HttpUrl) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args) - } - if len(queryParams.Operation) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args) - } - if len(queryParams.MinDuration) != 0 { - query = query + " AND durationNano >= @durationNanoMin" - args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration)) - } - if len(queryParams.MaxDuration) != 0 { - query = query + " AND durationNano <= @durationNanoMax" - args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration)) - } - if len(queryParams.SpanKind) != 0 { - query = query + " AND kind = @kind" - args = append(args, clickhouse.Named("kind", queryParams.SpanKind)) - } - - query = getStatusFilters(query, queryParams.Status, excludeMap) - - tagValues := []model.TagValues{} - - finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT stringTagMap[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " LIMIT @limit" - - args = append(args, clickhouse.Named("key", queryParams.TagKey.Key)) - args = append(args, clickhouse.Named("limit", queryParams.Limit)) - err := r.db.Select(ctx, &tagValues, finalQuery, args...) - - zap.L().Info(query) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - - cleanedTagValues := model.TagValues{ - StringTagValues: []string{}, - NumberTagValues: []float64{}, - BoolTagValues: []bool{}, - } - if len(tagValues) == 0 { - return &cleanedTagValues, nil - } - for _, e := range tagValues[0].StringTagValues { - if e != "" { - cleanedTagValues.StringTagValues = append(cleanedTagValues.StringTagValues, e) - } - } - return &cleanedTagValues, nil -} - func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) { namedArgs := []interface{}{ @@ -1823,185 +1211,6 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams * return &response, nil } -func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) { - - excludeMap := make(map[string]struct{}) - for _, e := range queryParams.Exclude { - if e == constants.OperationRequest { - excludeMap[constants.OperationDB] = struct{}{} - continue - } - excludeMap[e] = struct{}{} - } - - SpanAggregatesDBResponseItems := []model.SpanAggregatesDBResponseItem{} - - aggregation_query := "" - if queryParams.Dimension == "duration" { - switch queryParams.AggregationOption { - case "p50": - aggregation_query = " quantile(0.50)(durationNano) as float64Value " - case "p95": - aggregation_query = " quantile(0.95)(durationNano) as float64Value " - case "p90": - aggregation_query = " quantile(0.90)(durationNano) as float64Value " - case "p99": - aggregation_query = " quantile(0.99)(durationNano) as float64Value " - case "max": - aggregation_query = " max(durationNano) as value " - case "min": - aggregation_query = " min(durationNano) as value " - case "avg": - aggregation_query = " avg(durationNano) as float64Value " - case "sum": - aggregation_query = " sum(durationNano) as value " - default: - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("aggregate type: %s not supported", queryParams.AggregationOption)} - } - } else if queryParams.Dimension == "calls" { - aggregation_query = " count(*) as value " - } - - args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} - - var query string - var customStr []string - _, columnExists := constants.GroupByColMap[queryParams.GroupBy] - // Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet: - // issue link: https://github.com/ClickHouse/clickhouse-go/issues/870 - if queryParams.GroupBy != "" && columnExists { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.indexTable) - args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy)) - } else if queryParams.GroupBy != "" { - customStr = strings.Split(queryParams.GroupBy, ".(") - if len(customStr) < 2 { - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)} - } - if customStr[1] == string(model.TagTypeString)+")" { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) - } else if customStr[1] == string(model.TagTypeNumber)+")" { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) - } else if customStr[1] == string(model.TagTypeBool)+")" { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) - } else { - // return error for unsupported group by - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)} - } - } else { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - } - - if len(queryParams.TraceID) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args) - } - if len(queryParams.ServiceName) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args) - } - if len(queryParams.HttpRoute) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args) - } - if len(queryParams.HttpHost) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args) - } - if len(queryParams.HttpMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args) - } - if len(queryParams.HttpUrl) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args) - } - if len(queryParams.Operation) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args) - } - if len(queryParams.RPCMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args) - } - if len(queryParams.ResponseStatusCode) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args) - } - if len(queryParams.MinDuration) != 0 { - query = query + " AND durationNano >= @durationNanoMin" - args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration)) - } - if len(queryParams.MaxDuration) != 0 { - query = query + " AND durationNano <= @durationNanoMax" - args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration)) - } - query = getStatusFilters(query, queryParams.Status, excludeMap) - - if len(queryParams.SpanKind) != 0 { - query = query + " AND kind = @kind" - args = append(args, clickhouse.Named("kind", queryParams.SpanKind)) - } - // create TagQuery from TagQueryParams - tags := createTagQueryFromTagQueryParams(queryParams.Tags) - subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) - query += subQuery - args = append(args, argsSubQuery...) - - if errStatus != nil { - return nil, errStatus - } - - if queryParams.GroupBy != "" && columnExists { - query = query + fmt.Sprintf(" GROUP BY time, %s as groupBy ORDER BY time", queryParams.GroupBy) - } else if queryParams.GroupBy != "" { - if customStr[1] == string(model.TagTypeString)+")" { - query = query + fmt.Sprintf(" GROUP BY time, stringTagMap['%s'] as groupBy ORDER BY time", customStr[0]) - } else if customStr[1] == string(model.TagTypeNumber)+")" { - query = query + fmt.Sprintf(" GROUP BY time, toString(numberTagMap['%s']) as groupBy ORDER BY time", customStr[0]) - } else if customStr[1] == string(model.TagTypeBool)+")" { - query = query + fmt.Sprintf(" GROUP BY time, toString(boolTagMap['%s']) as groupBy ORDER BY time", customStr[0]) - } - } else { - query = query + " GROUP BY time ORDER BY time" - } - - err := r.db.Select(ctx, &SpanAggregatesDBResponseItems, query, args...) - - zap.L().Info(query) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - - GetFilteredSpansAggregatesResponse := model.GetFilteredSpansAggregatesResponse{ - Items: map[int64]model.SpanAggregatesResponseItem{}, - } - - for i := range SpanAggregatesDBResponseItems { - if SpanAggregatesDBResponseItems[i].Value == 0 { - SpanAggregatesDBResponseItems[i].Value = uint64(SpanAggregatesDBResponseItems[i].Float64Value) - } - SpanAggregatesDBResponseItems[i].Timestamp = int64(SpanAggregatesDBResponseItems[i].Time.UnixNano()) - SpanAggregatesDBResponseItems[i].FloatValue = float32(SpanAggregatesDBResponseItems[i].Value) - if queryParams.AggregationOption == "rate_per_sec" { - SpanAggregatesDBResponseItems[i].FloatValue = float32(SpanAggregatesDBResponseItems[i].Value) / float32(queryParams.StepSeconds) - } - if responseElement, ok := GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp]; !ok { - if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy != "" { - GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{ - Timestamp: SpanAggregatesDBResponseItems[i].Timestamp, - GroupBy: map[string]float32{SpanAggregatesDBResponseItems[i].GroupBy: SpanAggregatesDBResponseItems[i].FloatValue}, - } - } else if queryParams.GroupBy == "" { - GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{ - Timestamp: SpanAggregatesDBResponseItems[i].Timestamp, - Value: SpanAggregatesDBResponseItems[i].FloatValue, - } - } - - } else { - if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy != "" { - responseElement.GroupBy[SpanAggregatesDBResponseItems[i].GroupBy] = SpanAggregatesDBResponseItems[i].FloatValue - } - GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = responseElement - } - } - - return &GetFilteredSpansAggregatesResponse, nil -} - func getLocalTableName(tableName string) string { tableNameSplit := strings.Split(tableName, ".") diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 6586e21d98..2be68ede43 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -526,12 +526,6 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet) router.HandleFunc("/api/v1/health", am.OpenAccess(aH.getHealth)).Methods(http.MethodGet) - router.HandleFunc("/api/v1/getSpanFilters", am.ViewAccess(aH.getSpanFilters)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/getTagFilters", am.ViewAccess(aH.getTagFilters)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/getFilteredSpans", am.ViewAccess(aH.getFilteredSpans)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/getFilteredSpans/aggregates", am.ViewAccess(aH.getFilteredSpanAggregates)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/getTagValues", am.ViewAccess(aH.getTagValues)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/listErrors", am.ViewAccess(aH.listErrors)).Methods(http.MethodPost) router.HandleFunc("/api/v1/countErrors", am.ViewAccess(aH.countErrors)).Methods(http.MethodPost) router.HandleFunc("/api/v1/errorFromErrorID", am.ViewAccess(aH.getErrorFromErrorID)).Methods(http.MethodGet) @@ -1847,86 +1841,6 @@ func (aH *APIHandler) getErrorFromGroupID(w http.ResponseWriter, r *http.Request aH.WriteJSON(w, r, result) } -func (aH *APIHandler) getSpanFilters(w http.ResponseWriter, r *http.Request) { - - query, err := parseSpanFilterRequestBody(r) - if aH.HandleError(w, err, http.StatusBadRequest) { - return - } - - result, apiErr := aH.reader.GetSpanFilters(r.Context(), query) - - if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.WriteJSON(w, r, result) -} - -func (aH *APIHandler) getFilteredSpans(w http.ResponseWriter, r *http.Request) { - - query, err := parseFilteredSpansRequest(r, aH) - if aH.HandleError(w, err, http.StatusBadRequest) { - return - } - - result, apiErr := aH.reader.GetFilteredSpans(r.Context(), query) - - if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.WriteJSON(w, r, result) -} - -func (aH *APIHandler) getFilteredSpanAggregates(w http.ResponseWriter, r *http.Request) { - - query, err := parseFilteredSpanAggregatesRequest(r) - if aH.HandleError(w, err, http.StatusBadRequest) { - return - } - - result, apiErr := aH.reader.GetFilteredSpansAggregates(r.Context(), query) - - if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.WriteJSON(w, r, result) -} - -func (aH *APIHandler) getTagFilters(w http.ResponseWriter, r *http.Request) { - - query, err := parseTagFilterRequest(r) - if aH.HandleError(w, err, http.StatusBadRequest) { - return - } - - result, apiErr := aH.reader.GetTagFilters(r.Context(), query) - - if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.WriteJSON(w, r, result) -} - -func (aH *APIHandler) getTagValues(w http.ResponseWriter, r *http.Request) { - - query, err := parseTagValueRequest(r) - if aH.HandleError(w, err, http.StatusBadRequest) { - return - } - - result, apiErr := aH.reader.GetTagValues(r.Context(), query) - - if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.WriteJSON(w, r, result) -} - func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) { ttlParams, err := parseTTLParams(r) if aH.HandleError(w, err, http.StatusBadRequest) { diff --git a/pkg/query-service/app/logs/v3/enrich_query.go b/pkg/query-service/app/logs/v3/enrich_query.go index cd5c2d6a0c..2f853a12b9 100644 --- a/pkg/query-service/app/logs/v3/enrich_query.go +++ b/pkg/query-service/app/logs/v3/enrich_query.go @@ -142,7 +142,7 @@ func enrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.Attribu } // check if the field is present in the fields map - for _, key := range utils.GenerateLogEnrichmentKeys(field) { + for _, key := range utils.GenerateEnrichmentKeys(field) { if val, ok := fields[key]; ok { return val } diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index fd7198b334..fc9b0d9431 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -12,6 +12,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/constants" chErrors "go.signoz.io/signoz/pkg/query-service/errors" "go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/utils" @@ -52,7 +53,8 @@ type querier struct { returnedSeries []*v3.Series returnedErr error - UseLogsNewSchema bool + UseLogsNewSchema bool + UseTraceNewSchema bool } type QuerierOptions struct { @@ -308,56 +310,121 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang return results, errQueriesByName, err } -func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { +func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { res := make([]*v3.Result, 0) qName := "" pageSize := uint64(0) + limit := uint64(0) + offset := uint64(0) // se we are considering only one query for name, v := range params.CompositeQuery.BuilderQueries { qName = name pageSize = v.PageSize + + // for traces specifically + limit = v.Limit + offset = v.Offset } data := []*v3.Row{} + tracesLimit := limit + offset + for _, v := range tsRanges { params.Start = v.Start params.End = v.End - params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data)) - queries, err := q.builder.PrepareQueries(params) - if err != nil { - return nil, nil, err - } - + length := uint64(0) // this will to run only once - for name, query := range queries { - rowList, err := q.reader.GetListResultV3(ctx, query) + + // appending the filter to get the next set of data + if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs { + params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data)) + queries, err := q.builder.PrepareQueries(params) if err != nil { - errs := []error{err} - errQuriesByName := map[string]error{ - name: err, + return nil, nil, err + } + for name, query := range queries { + rowList, err := q.reader.GetListResultV3(ctx, query) + if err != nil { + errs := []error{err} + errQueriesByName := map[string]error{ + name: err, + } + return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) } - return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) + length += uint64(len(rowList)) + data = append(data, rowList...) } - data = append(data, rowList...) - } - // append a filter to the params - if len(data) > 0 { - params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ - Key: v3.AttributeKey{ - Key: "id", - IsColumn: true, - DataType: "string", - }, - Operator: v3.FilterOperatorLessThan, - Value: data[len(data)-1].Data["id"], - }) - } + if length > 0 { + params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "id", + IsColumn: true, + DataType: "string", + }, + Operator: v3.FilterOperatorLessThan, + Value: data[len(data)-1].Data["id"], + }) + } - if uint64(len(data)) >= pageSize { - break + if uint64(len(data)) >= pageSize { + break + } + } else { + // TRACE + // we are updating the offset and limit based on the number of traces we have found in the current timerange + // eg - + // 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // + // if 100 traces are there in [t1, t10] then 100 will return immediately. + // if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20] + // if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100 + + // + // 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // + // If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces + // If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20] + // if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0 + + // max limit + offset is 10k for pagination + if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT { + return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000") + } + + params.CompositeQuery.BuilderQueries[qName].Offset = 0 + params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit + queries, err := q.builder.PrepareQueries(params) + if err != nil { + return nil, nil, err + } + for name, query := range queries { + rowList, err := q.reader.GetListResultV3(ctx, query) + if err != nil { + errs := []error{err} + errQueriesByName := map[string]error{ + name: err, + } + return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) + } + length += uint64(len(rowList)) + + // skip the traces unless offset is 0 + for _, row := range rowList { + if offset == 0 { + data = append(data, row) + } else { + offset-- + } + } + } + tracesLimit = tracesLimit - length + + if uint64(len(data)) >= limit { + break + } } } res = append(res, &v3.Result{ @@ -368,15 +435,25 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar } func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { - // List query has support for only one query. - if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 { + // List query has support for only one query + // we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload + if params.CompositeQuery != nil && + len(params.CompositeQuery.BuilderQueries) == 1 && + params.CompositeQuery.PanelType != v3.PanelTypeTrace { for _, v := range params.CompositeQuery.BuilderQueries { + if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) || + (v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) { + break + } + // only allow of logs queries with timestamp ordering desc - if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" { - startEndArr := utils.GetLogsListTsRanges(params.Start, params.End) - if len(startEndArr) > 0 { - return q.runLogsListQuery(ctx, params, startEndArr) - } + // TODO(nitya): allow for timestamp asc + if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) && + len(v.OrderBy) == 1 && + v.OrderBy[0].ColumnName == "timestamp" && + v.OrderBy[0].Order == "desc" { + startEndArr := utils.GetListTsRanges(params.Start, params.End) + return q.runWindowBasedListQuery(ctx, params, startEndArr) } } } @@ -408,13 +485,13 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan close(ch) var errs []error - errQuriesByName := make(map[string]error) + errQueriesByName := make(map[string]error) res := make([]*v3.Result, 0) // read values from the channel for r := range ch { if r.Err != nil { errs = append(errs, r.Err) - errQuriesByName[r.Name] = r.Err + errQueriesByName[r.Name] = r.Err continue } res = append(res, &v3.Result{ @@ -423,7 +500,7 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan }) } if len(errs) != 0 { - return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) + return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) } return res, nil, nil } diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index 6a28c2c267..62e7e97c8a 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -5,15 +5,21 @@ import ( "encoding/json" "fmt" "math" + "regexp" "strings" "testing" "time" + cmock "github.com/srikanthccv/ClickHouse-go-mock" + "github.com/stretchr/testify/require" + "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/cache/inmemory" + "go.signoz.io/signoz/pkg/query-service/featureManager" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/querycache" + "go.signoz.io/signoz/pkg/query-service/utils" ) func minTimestamp(series []*v3.Series) int64 { @@ -1124,3 +1130,304 @@ func TestQueryRangeValueTypePromQL(t *testing.T) { } } } + +type regexMatcher struct { +} + +func (m *regexMatcher) Match(expectedSQL, actualSQL string) error { + re, err := regexp.Compile(expectedSQL) + if err != nil { + return err + } + if !re.MatchString(actualSQL) { + return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL) + } + return nil +} + +func Test_querier_runWindowBasedListQuery(t *testing.T) { + params := &v3.QueryRangeParamsV3{ + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + CompositeQuery: &v3.CompositeQuery{ + PanelType: v3.PanelTypeList, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + Expression: "A", + DataSource: v3.DataSourceTraces, + PageSize: 10, + Limit: 100, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + SelectColumns: []v3.AttributeKey{{Key: "serviceName"}}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + }, + }, + }, + } + + tsRanges := []utils.LogsListTsRange{ + { + Start: 1722259200000000000, // July 29, 2024 6:50:00 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + }, + { + Start: 1722252000000000000, // July 29, 2024 4:50:00 PM + End: 1722259200000000000, // July 29, 2024 6:50:00 PM + }, + { + Start: 1722237600000000000, // July 29, 2024 12:50:00 PM + End: 1722252000000000000, // July 29, 2024 4:50:00 PM + }, + { + Start: 1722208800000000000, // July 29, 2024 4:50:00 AM + End: 1722237600000000000, // July 29, 2024 12:50:00 PM + }, + { + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722208800000000000, // July 29, 2024 4:50:00 AM + }, + } + + type queryParams struct { + start int64 + end int64 + limit uint64 + offset uint64 + } + + type queryResponse struct { + expectedQuery string + timestamps []uint64 + } + + // create test struct with moc data i.e array of timestamps, limit, offset and expected results + testCases := []struct { + name string + queryResponses []queryResponse + queryParams queryParams + expectedTimestamps []int64 + expectedError bool + }{ + { + name: "should return correct timestamps when querying within time window", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2", + timestamps: []uint64{1722259300000000000, 1722259400000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 2, + offset: 0, + }, + expectedTimestamps: []int64{1722259300000000000, 1722259400000000000}, + }, + { + name: "all data not in first windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3", + timestamps: []uint64{1722259300000000000, 1722259400000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1", + timestamps: []uint64{1722253000000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 3, + offset: 0, + }, + expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000}, + }, + { + name: "data in multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5", + timestamps: []uint64{1722259300000000000, 1722259400000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3", + timestamps: []uint64{1722253000000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 2", + timestamps: []uint64{1722237700000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 1", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* DESC LIMIT 1", + timestamps: []uint64{}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 5, + offset: 0, + }, + expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000}, + }, + { + name: "query with offset", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7", + timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4", + timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1", + timestamps: []uint64{1722237700000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 4, + offset: 3, + }, + expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000}, + }, + { + name: "query with offset and limit- data spread across multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 11", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11", + timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8", + timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3", + timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 5, + offset: 6, + }, + expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000}, + }, + { + name: "don't allow pagination to get more than 10k spans", + queryResponses: []queryResponse{}, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 10, + offset: 9991, + }, + expectedError: true, + }, + } + + cols := []cmock.ColumnType{ + {Name: "timestamp", Type: "UInt64"}, + {Name: "name", Type: "String"}, + } + testName := "name" + + options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") + + // iterate over test data, create reader and run test + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock + mock, err := cmock.NewClickHouseWithQueryMatcher(nil, ®exMatcher{}) + require.NoError(t, err, "Failed to create ClickHouse mock") + + // Configure mock responses + for _, response := range tc.queryResponses { + values := make([][]any, 0, len(response.timestamps)) + for _, ts := range response.timestamps { + values = append(values, []any{&ts, &testName}) + } + // if len(values) > 0 { + mock.ExpectQuery(response.expectedQuery).WillReturnRows( + cmock.NewRows(cols, values), + ) + // } + } + + // Create reader and querier + reader := clickhouseReader.NewReaderFromClickhouseConnection( + mock, + options, + nil, + "", + featureManager.StartManager(), + "", + true, + ) + + q := &querier{ + reader: reader, + builder: queryBuilder.NewQueryBuilder( + queryBuilder.QueryBuilderOptions{ + BuildTraceQuery: tracesV3.PrepareTracesQuery, + }, + featureManager.StartManager(), + ), + } + // Update query parameters + params.Start = tc.queryParams.start + params.End = tc.queryParams.end + params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit + params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset + + // Execute query + results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges) + + if tc.expectedError { + require.Error(t, err) + return + } + + // Assertions + require.NoError(t, err, "Query execution failed") + require.Nil(t, errMap, "Unexpected error map in results") + require.Len(t, results, 1, "Expected exactly one result set") + + result := results[0] + require.Equal(t, "A", result.QueryName, "Incorrect query name in results") + require.Len(t, result.List, len(tc.expectedTimestamps), + "Result count mismatch: got %d results, expected %d", + len(result.List), len(tc.expectedTimestamps)) + + for i, expected := range tc.expectedTimestamps { + require.Equal(t, expected, result.List[i].Timestamp.UnixNano(), + "Timestamp mismatch at index %d: got %d, expected %d", + i, result.List[i].Timestamp.UnixNano(), expected) + } + + // Verify mock expectations + err = mock.ExpectationsWereMet() + require.NoError(t, err, "Mock expectations were not met") + }) + } +} diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index 311d213656..883b1055f4 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -12,6 +12,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/constants" chErrors "go.signoz.io/signoz/pkg/query-service/errors" "go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/utils" @@ -48,10 +49,11 @@ type querier struct { testingMode bool queriesExecuted []string // tuple of start and end time in milliseconds - timeRanges [][]int - returnedSeries []*v3.Series - returnedErr error - UseLogsNewSchema bool + timeRanges [][]int + returnedSeries []*v3.Series + returnedErr error + UseLogsNewSchema bool + UseTraceNewSchema bool } type QuerierOptions struct { @@ -308,56 +310,121 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang return results, errQueriesByName, err } -func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { +func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { res := make([]*v3.Result, 0) qName := "" pageSize := uint64(0) + limit := uint64(0) + offset := uint64(0) // se we are considering only one query for name, v := range params.CompositeQuery.BuilderQueries { qName = name pageSize = v.PageSize + + // for traces specifically + limit = v.Limit + offset = v.Offset } data := []*v3.Row{} + tracesLimit := limit + offset + for _, v := range tsRanges { params.Start = v.Start params.End = v.End - params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data)) - queries, err := q.builder.PrepareQueries(params) - if err != nil { - return nil, nil, err - } - + length := uint64(0) // this will to run only once - for name, query := range queries { - rowList, err := q.reader.GetListResultV3(ctx, query) + + // appending the filter to get the next set of data + if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs { + params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data)) + queries, err := q.builder.PrepareQueries(params) if err != nil { - errs := []error{err} - errQuriesByName := map[string]error{ - name: err, + return nil, nil, err + } + for name, query := range queries { + rowList, err := q.reader.GetListResultV3(ctx, query) + if err != nil { + errs := []error{err} + errQueriesByName := map[string]error{ + name: err, + } + return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) } - return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) + length += uint64(len(rowList)) + data = append(data, rowList...) } - data = append(data, rowList...) - } - // append a filter to the params - if len(data) > 0 { - params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ - Key: v3.AttributeKey{ - Key: "id", - IsColumn: true, - DataType: "string", - }, - Operator: v3.FilterOperatorLessThan, - Value: data[len(data)-1].Data["id"], - }) - } + if length > 0 { + params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "id", + IsColumn: true, + DataType: "string", + }, + Operator: v3.FilterOperatorLessThan, + Value: data[len(data)-1].Data["id"], + }) + } + + if uint64(len(data)) >= pageSize { + break + } + } else { + // TRACE + // we are updating the offset and limit based on the number of traces we have found in the current timerange + // eg - + // 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // + // if 100 traces are there in [t1, t10] then 100 will return immediately. + // if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20] + // if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100 + + // + // 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // + // If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces + // If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20] + // if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0 + + // max limit + offset is 10k for pagination + if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT { + return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000") + } + + params.CompositeQuery.BuilderQueries[qName].Offset = 0 + params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit + queries, err := q.builder.PrepareQueries(params) + if err != nil { + return nil, nil, err + } + for name, query := range queries { + rowList, err := q.reader.GetListResultV3(ctx, query) + if err != nil { + errs := []error{err} + errQueriesByName := map[string]error{ + name: err, + } + return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) + } + length += uint64(len(rowList)) + + // skip the traces unless offset is 0 + for _, row := range rowList { + if offset == 0 { + data = append(data, row) + } else { + offset-- + } + } + } + tracesLimit = tracesLimit - length - if uint64(len(data)) >= pageSize { - break + if uint64(len(data)) >= limit { + break + } } } res = append(res, &v3.Result{ @@ -369,14 +436,24 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { // List query has support for only one query. - if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 { + // we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload + if params.CompositeQuery != nil && + len(params.CompositeQuery.BuilderQueries) == 1 && + params.CompositeQuery.PanelType != v3.PanelTypeTrace { for _, v := range params.CompositeQuery.BuilderQueries { + if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) || + (v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) { + break + } + // only allow of logs queries with timestamp ordering desc - if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" { - startEndArr := utils.GetLogsListTsRanges(params.Start, params.End) - if len(startEndArr) > 0 { - return q.runLogsListQuery(ctx, params, startEndArr) - } + // TODO(nitya): allow for timestamp asc + if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) && + len(v.OrderBy) == 1 && + v.OrderBy[0].ColumnName == "timestamp" && + v.OrderBy[0].Order == "desc" { + startEndArr := utils.GetListTsRanges(params.Start, params.End) + return q.runWindowBasedListQuery(ctx, params, startEndArr) } } } @@ -416,13 +493,13 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan close(ch) var errs []error - errQuriesByName := make(map[string]error) + errQueriesByName := make(map[string]error) res := make([]*v3.Result, 0) // read values from the channel for r := range ch { if r.Err != nil { errs = append(errs, r.Err) - errQuriesByName[r.Name] = r.Err + errQueriesByName[r.Name] = r.Err continue } res = append(res, &v3.Result{ @@ -431,7 +508,7 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan }) } if len(errs) != 0 { - return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) + return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) } return res, nil, nil } diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index 62e18d244d..e52bb53823 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -5,15 +5,21 @@ import ( "encoding/json" "fmt" "math" + "regexp" "strings" "testing" "time" + cmock "github.com/srikanthccv/ClickHouse-go-mock" + "github.com/stretchr/testify/require" + "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/cache/inmemory" + "go.signoz.io/signoz/pkg/query-service/featureManager" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/querycache" + "go.signoz.io/signoz/pkg/query-service/utils" ) func minTimestamp(series []*v3.Series) int64 { @@ -798,8 +804,8 @@ func TestV2QueryRangeValueType(t *testing.T) { } q := NewQuerier(opts) expectedTimeRangeInQueryString := []string{ - fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), // 31st Jan, 03:23:00 to 31st Jan, 05:23:00 - fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00 + fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), // 31st Jan, 03:23:00 to 31st Jan, 05:23:00 + fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00 fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675119196722)*int64(1000000), (1675126396722)*int64(1000000)), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00 } @@ -1178,3 +1184,304 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) { } } } + +type regexMatcher struct { +} + +func (m *regexMatcher) Match(expectedSQL, actualSQL string) error { + re, err := regexp.Compile(expectedSQL) + if err != nil { + return err + } + if !re.MatchString(actualSQL) { + return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL) + } + return nil +} + +func Test_querier_runWindowBasedListQuery(t *testing.T) { + params := &v3.QueryRangeParamsV3{ + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + CompositeQuery: &v3.CompositeQuery{ + PanelType: v3.PanelTypeList, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + Expression: "A", + DataSource: v3.DataSourceTraces, + PageSize: 10, + Limit: 100, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + SelectColumns: []v3.AttributeKey{{Key: "serviceName"}}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + }, + }, + }, + } + + tsRanges := []utils.LogsListTsRange{ + { + Start: 1722259200000000000, // July 29, 2024 6:50:00 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + }, + { + Start: 1722252000000000000, // July 29, 2024 4:50:00 PM + End: 1722259200000000000, // July 29, 2024 6:50:00 PM + }, + { + Start: 1722237600000000000, // July 29, 2024 12:50:00 PM + End: 1722252000000000000, // July 29, 2024 4:50:00 PM + }, + { + Start: 1722208800000000000, // July 29, 2024 4:50:00 AM + End: 1722237600000000000, // July 29, 2024 12:50:00 PM + }, + { + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722208800000000000, // July 29, 2024 4:50:00 AM + }, + } + + type queryParams struct { + start int64 + end int64 + limit uint64 + offset uint64 + } + + type queryResponse struct { + expectedQuery string + timestamps []uint64 + } + + // create test struct with moc data i.e array of timestamps, limit, offset and expected results + testCases := []struct { + name string + queryResponses []queryResponse + queryParams queryParams + expectedTimestamps []int64 + expectedError bool + }{ + { + name: "should return correct timestamps when querying within time window", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2", + timestamps: []uint64{1722259300000000000, 1722259400000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 2, + offset: 0, + }, + expectedTimestamps: []int64{1722259300000000000, 1722259400000000000}, + }, + { + name: "all data not in first windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3", + timestamps: []uint64{1722259300000000000, 1722259400000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1", + timestamps: []uint64{1722253000000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 3, + offset: 0, + }, + expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000}, + }, + { + name: "data in multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5", + timestamps: []uint64{1722259300000000000, 1722259400000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3", + timestamps: []uint64{1722253000000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 2", + timestamps: []uint64{1722237700000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 1", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* DESC LIMIT 1", + timestamps: []uint64{}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 5, + offset: 0, + }, + expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000}, + }, + { + name: "query with offset", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7", + timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4", + timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1", + timestamps: []uint64{1722237700000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 4, + offset: 3, + }, + expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000}, + }, + { + name: "query with offset and limit- data spread across multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 11", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11", + timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8", + timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3", + timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 5, + offset: 6, + }, + expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000}, + }, + { + name: "don't allow pagination to get more than 10k spans", + queryResponses: []queryResponse{}, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 10, + offset: 9991, + }, + expectedError: true, + }, + } + + cols := []cmock.ColumnType{ + {Name: "timestamp", Type: "UInt64"}, + {Name: "name", Type: "String"}, + } + testName := "name" + + options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") + + // iterate over test data, create reader and run test + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock + mock, err := cmock.NewClickHouseWithQueryMatcher(nil, ®exMatcher{}) + require.NoError(t, err, "Failed to create ClickHouse mock") + + // Configure mock responses + for _, response := range tc.queryResponses { + values := make([][]any, 0, len(response.timestamps)) + for _, ts := range response.timestamps { + values = append(values, []any{&ts, &testName}) + } + // if len(values) > 0 { + mock.ExpectQuery(response.expectedQuery).WillReturnRows( + cmock.NewRows(cols, values), + ) + // } + } + + // Create reader and querier + reader := clickhouseReader.NewReaderFromClickhouseConnection( + mock, + options, + nil, + "", + featureManager.StartManager(), + "", + true, + ) + + q := &querier{ + reader: reader, + builder: queryBuilder.NewQueryBuilder( + queryBuilder.QueryBuilderOptions{ + BuildTraceQuery: tracesV3.PrepareTracesQuery, + }, + featureManager.StartManager(), + ), + } + // Update query parameters + params.Start = tc.queryParams.start + params.End = tc.queryParams.end + params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit + params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset + + // Execute query + results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges) + + if tc.expectedError { + require.Error(t, err) + return + } + + // Assertions + require.NoError(t, err, "Query execution failed") + require.Nil(t, errMap, "Unexpected error map in results") + require.Len(t, results, 1, "Expected exactly one result set") + + result := results[0] + require.Equal(t, "A", result.QueryName, "Incorrect query name in results") + require.Len(t, result.List, len(tc.expectedTimestamps), + "Result count mismatch: got %d results, expected %d", + len(result.List), len(tc.expectedTimestamps)) + + for i, expected := range tc.expectedTimestamps { + require.Equal(t, expected, result.List[i].Timestamp.UnixNano(), + "Timestamp mismatch at index %d: got %d, expected %d", + i, result.List[i].Timestamp.UnixNano(), expected) + } + + // Verify mock expectations + err = mock.ExpectationsWereMet() + require.NoError(t, err, "Mock expectations were not met") + }) + } +} diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index dc6ac21e15..6a030e73cf 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -384,6 +384,11 @@ func LogCommentEnricher(next http.Handler) http.Handler { client = "api" } + email, err := auth.GetEmailFromJwt(r.Context()) + if err != nil { + zap.S().Errorf("error while getting email from jwt: %v", err) + } + kvs := map[string]string{ "path": path, "dashboardID": dashboardID, @@ -392,6 +397,7 @@ func LogCommentEnricher(next http.Handler) http.Handler { "client": client, "viewName": viewName, "servicesTab": tab, + "email": email, } r = r.WithContext(context.WithValue(r.Context(), common.LogCommentKey, kvs)) diff --git a/pkg/query-service/app/traces/v4/enrich.go b/pkg/query-service/app/traces/v4/enrich.go new file mode 100644 index 0000000000..848e489e86 --- /dev/null +++ b/pkg/query-service/app/traces/v4/enrich.go @@ -0,0 +1,118 @@ +package v4 + +import ( + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +// if the field is timestamp/id/value we don't need to enrich +// if the field is static we don't need to enrich +// for all others we need to enrich +// an attribute/resource can be materialized/dematerialized +// but the query should work regardless and shouldn't fail +func isEnriched(field v3.AttributeKey) bool { + // if it is timestamp/id dont check + if field.Key == "timestamp" || field.Key == constants.SigNozOrderByValue { + return true + } + + // we need to check if the field is static and return false if isColumn is not set + if _, ok := constants.StaticFieldsTraces[field.Key]; ok && field.IsColumn { + return true + } + + return false +} + +func enrichKeyWithMetadata(key v3.AttributeKey, keys map[string]v3.AttributeKey) v3.AttributeKey { + if isEnriched(key) { + return key + } + + if v, ok := constants.StaticFieldsTraces[key.Key]; ok { + return v + } + + for _, key := range utils.GenerateEnrichmentKeys(key) { + if val, ok := keys[key]; ok { + return val + } + } + + // enrich with default values if metadata is not found + if key.Type == "" { + key.Type = v3.AttributeKeyTypeTag + } + if key.DataType == "" { + key.DataType = v3.AttributeKeyDataTypeString + } + return key +} + +func Enrich(params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) { + if params.CompositeQuery.QueryType != v3.QueryTypeBuilder { + return + } + + for _, query := range params.CompositeQuery.BuilderQueries { + if query.DataSource == v3.DataSourceTraces { + EnrichTracesQuery(query, keys) + } + } +} + +func EnrichTracesQuery(query *v3.BuilderQuery, keys map[string]v3.AttributeKey) { + // enrich aggregate attribute + query.AggregateAttribute = enrichKeyWithMetadata(query.AggregateAttribute, keys) + + // enrich filter items + if query.Filters != nil && len(query.Filters.Items) > 0 { + for idx, filter := range query.Filters.Items { + query.Filters.Items[idx].Key = enrichKeyWithMetadata(filter.Key, keys) + // if the serviceName column is used, use the corresponding resource attribute as well during filtering + // since there is only one of these resource attributes we are adding it here directly. + // move it somewhere else if this list is big + if filter.Key.Key == "serviceName" { + query.Filters.Items[idx].Key = v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + IsColumn: false, + } + } + } + } + + // enrich group by + for idx, groupBy := range query.GroupBy { + query.GroupBy[idx] = enrichKeyWithMetadata(groupBy, keys) + } + + // enrich order by + query.OrderBy = enrichOrderBy(query.OrderBy, keys) + + // enrich select columns + for idx, selectColumn := range query.SelectColumns { + query.SelectColumns[idx] = enrichKeyWithMetadata(selectColumn, keys) + } + +} + +func enrichOrderBy(items []v3.OrderBy, keys map[string]v3.AttributeKey) []v3.OrderBy { + enrichedItems := []v3.OrderBy{} + for i := 0; i < len(items); i++ { + attributeKey := enrichKeyWithMetadata(v3.AttributeKey{ + Key: items[i].ColumnName, + }, keys) + enrichedItems = append(enrichedItems, v3.OrderBy{ + ColumnName: items[i].ColumnName, + Order: items[i].Order, + Key: attributeKey.Key, + DataType: attributeKey.DataType, + Type: attributeKey.Type, + IsColumn: attributeKey.IsColumn, + }) + } + return enrichedItems +} diff --git a/pkg/query-service/app/traces/v4/enrich_test.go b/pkg/query-service/app/traces/v4/enrich_test.go new file mode 100644 index 0000000000..840b251fbf --- /dev/null +++ b/pkg/query-service/app/traces/v4/enrich_test.go @@ -0,0 +1,196 @@ +package v4 + +import ( + "reflect" + "testing" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestEnrichTracesQuery(t *testing.T) { + type args struct { + query *v3.BuilderQuery + keys map[string]v3.AttributeKey + want *v3.BuilderQuery + } + tests := []struct { + name string + args args + }{ + { + name: "test 1", + args: args{ + query: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag}, Value: 100, Operator: ">"}, + }, + }, + OrderBy: []v3.OrderBy{}, + }, + keys: map[string]v3.AttributeKey{ + "bytes##tag##int64": {Key: "bytes", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, + }, + want: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeInt64}, Value: 100, Operator: ">"}, + }, + }, + OrderBy: []v3.OrderBy{}, + }, + }, + }, + { + name: "test service name", + args: args{ + query: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "serviceName", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "myservice", Operator: "="}, + {Key: v3.AttributeKey{Key: "serviceName"}, Value: "myservice", Operator: "="}, + }, + }, + OrderBy: []v3.OrderBy{}, + }, + keys: map[string]v3.AttributeKey{}, + want: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "service.name", Type: v3.AttributeKeyTypeResource, DataType: v3.AttributeKeyDataTypeString}, Value: "myservice", Operator: "="}, + {Key: v3.AttributeKey{Key: "service.name", Type: v3.AttributeKeyTypeResource, DataType: v3.AttributeKeyDataTypeString}, Value: "myservice", Operator: "="}, + }, + }, + OrderBy: []v3.OrderBy{}, + }, + }, + }, + { + name: "test mat attrs", + args: args{ + query: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "/api", Operator: "="}, + {Key: v3.AttributeKey{Key: "msgSystem"}, Value: "name", Operator: "="}, + {Key: v3.AttributeKey{Key: "external_http_url"}, Value: "name", Operator: "="}, + }, + }, + OrderBy: []v3.OrderBy{}, + }, + keys: map[string]v3.AttributeKey{}, + want: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/api", Operator: "="}, + {Key: v3.AttributeKey{Key: "msgSystem", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "name", Operator: "="}, + {Key: v3.AttributeKey{Key: "external_http_url", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "name", Operator: "="}, + }, + }, + OrderBy: []v3.OrderBy{}, + }, + }, + }, + { + name: "test aggregateattr, filter, groupby, order by", + args: args{ + query: &v3.BuilderQuery{ + AggregateOperator: v3.AggregateOperatorCount, + AggregateAttribute: v3.AttributeKey{ + Key: "http.route", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString}, Value: "/api", Operator: "="}, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "http.route", DataType: v3.AttributeKeyDataTypeString}, + {Key: "msgSystem", DataType: v3.AttributeKeyDataTypeString}, + }, + OrderBy: []v3.OrderBy{ + {ColumnName: "httpRoute", Order: v3.DirectionAsc}, + }, + }, + keys: map[string]v3.AttributeKey{ + "http.route##tag##string": {Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + }, + want: &v3.BuilderQuery{ + AggregateAttribute: v3.AttributeKey{ + Key: "http.route", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: true, + }, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/api", Operator: "="}, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + {Key: "msgSystem", DataType: v3.AttributeKeyDataTypeString, IsJSON: false, IsColumn: true}, + }, + OrderBy: []v3.OrderBy{ + {Key: "httpRoute", Order: v3.DirectionAsc, ColumnName: "httpRoute", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, + }, + }, + }, + }, + { + name: "enrich default values", + args: args{ + query: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "testattr"}}, + }, + }, + OrderBy: []v3.OrderBy{{ColumnName: "timestamp", Order: v3.DirectionAsc}}, + }, + keys: map[string]v3.AttributeKey{}, + want: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{{Key: v3.AttributeKey{Key: "testattr", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeString}}}, + }, + // isColumn won't matter in timestamp as it will always be a column + OrderBy: []v3.OrderBy{{Key: "timestamp", Order: v3.DirectionAsc, ColumnName: "timestamp"}}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + EnrichTracesQuery(tt.args.query, tt.args.keys) + // Check AggregateAttribute + if tt.args.query.AggregateAttribute.Key != "" && !reflect.DeepEqual(tt.args.query.AggregateAttribute, tt.args.want.AggregateAttribute) { + t.Errorf("EnrichTracesQuery() AggregateAttribute = %v, want %v", tt.args.query.AggregateAttribute, tt.args.want.AggregateAttribute) + } + + // Check Filters + if tt.args.query.Filters != nil && !reflect.DeepEqual(tt.args.query.Filters, tt.args.want.Filters) { + t.Errorf("EnrichTracesQuery() Filters = %v, want %v", tt.args.query.Filters, tt.args.want.Filters) + } + + // Check GroupBy + if tt.args.query.GroupBy != nil && !reflect.DeepEqual(tt.args.query.GroupBy, tt.args.want.GroupBy) { + t.Errorf("EnrichTracesQuery() GroupBy = %v, want %v", tt.args.query.GroupBy, tt.args.want.GroupBy) + } + + // Check OrderBy + if tt.args.query.OrderBy != nil && !reflect.DeepEqual(tt.args.query.OrderBy, tt.args.want.OrderBy) { + t.Errorf("EnrichTracesQuery() OrderBy = %v, want %v", tt.args.query.OrderBy, tt.args.want.OrderBy) + } + }) + } +} diff --git a/pkg/query-service/app/traces/v4/query_builder_test.go b/pkg/query-service/app/traces/v4/query_builder_test.go index 11b5945557..76c9874409 100644 --- a/pkg/query-service/app/traces/v4/query_builder_test.go +++ b/pkg/query-service/app/traces/v4/query_builder_test.go @@ -124,6 +124,27 @@ func Test_getColumnName(t *testing.T) { }, want: "attributes_string['xyz']", }, + { + name: "new composite column", + args: args{ + key: v3.AttributeKey{Key: "response_status_code"}, + }, + want: "response_status_code", + }, + { + name: "new composite column with metadata", + args: args{ + key: v3.AttributeKey{Key: "response_status_code", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, + }, + want: "response_status_code", + }, + { + name: "new normal column with metadata", + args: args{ + key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + }, + want: "`attribute_string_http$$route`", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index dc52f6fd88..bc0cbb17ec 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -589,4 +589,60 @@ var StaticFieldsTraces = map[string]v3.AttributeKey{ DataType: v3.AttributeKeyDataTypeString, IsColumn: true, }, + + // new support + "response_status_code": { + Key: "response_status_code", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + "external_http_url": { + Key: "external_http_url", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + "http_url": { + Key: "http_url", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + "external_http_method": { + Key: "external_http_method", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + "http_method": { + Key: "http_method", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + "http_host": { + Key: "http_host", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + "db_name": { + Key: "db_name", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + "db_operation": { + Key: "db_operation", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + "has_error": { + Key: "has_error", + DataType: v3.AttributeKeyDataTypeBool, + IsColumn: true, + }, + "is_remote": { + Key: "is_remote", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + // the simple attributes are not present here as + // they are taken care by new format __'' } + +const TRACE_V4_MAX_PAGINATION_LIMIT = 10000 diff --git a/pkg/query-service/contextlinks/links.go b/pkg/query-service/contextlinks/links.go index 260745eda3..9e48dfb1a2 100644 --- a/pkg/query-service/contextlinks/links.go +++ b/pkg/query-service/contextlinks/links.go @@ -183,7 +183,7 @@ func PrepareFilters(labels map[string]string, whereClauseItems []v3.FilterItem, var attrFound bool // as of now this logic will only apply for logs - for _, tKey := range utils.GenerateLogEnrichmentKeys(v3.AttributeKey{Key: key}) { + for _, tKey := range utils.GenerateEnrichmentKeys(v3.AttributeKey{Key: key}) { if val, ok := keys[tKey]; ok { attributeKey = val attrFound = true diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index baad5f0a22..10c718aa28 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -29,15 +29,10 @@ type Reader interface { // GetDisks returns a list of disks configured in the underlying DB. It is supported by // clickhouse only. GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError) - GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error) - GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*model.TagFilters, *model.ApiError) - GetTagValues(ctx context.Context, query *model.TagFilterParams) (*model.TagValues, *model.ApiError) - GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) - GetFilteredSpansAggregates(ctx context.Context, query *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) ListErrors(ctx context.Context, params *model.ListErrorsParams) (*[]model.Error, *model.ApiError) CountErrors(ctx context.Context, params *model.CountErrorsParams) (uint64, *model.ApiError) diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 61be36f170..4d7fb9d230 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -546,6 +546,9 @@ type SignozLogV2 struct { SeverityText string `json:"severity_text" ch:"severity_text"` SeverityNumber uint8 `json:"severity_number" ch:"severity_number"` Body string `json:"body" ch:"body"` + ScopeName string `json:"scope_name" ch:"scope_name"` + ScopeVersion string `json:"scope_version" ch:"scope_version"` + ScopeString map[string]string `json:"scope_string" ch:"scope_string"` Resources_string map[string]string `json:"resources_string" ch:"resources_string"` Attributes_string map[string]string `json:"attributes_string" ch:"attributes_string"` Attributes_number map[string]float64 `json:"attributes_float" ch:"attributes_number"` diff --git a/pkg/query-service/rules/base_rule.go b/pkg/query-service/rules/base_rule.go index b6d2db0a3c..466cba83fd 100644 --- a/pkg/query-service/rules/base_rule.go +++ b/pkg/query-service/rules/base_rule.go @@ -463,9 +463,9 @@ func (r *BaseRule) ShouldAlert(series v3.Series) (Sample, bool) { } } else if r.compareOp() == ValueOutsideBounds { for _, smpl := range series.Points { - if math.Abs(smpl.Value) >= r.targetVal() { + if math.Abs(smpl.Value) < r.targetVal() { alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls} - shouldAlert = true + shouldAlert = false break } } diff --git a/pkg/query-service/telemetry/telemetry.go b/pkg/query-service/telemetry/telemetry.go index 972caa4679..1e2a1a1998 100644 --- a/pkg/query-service/telemetry/telemetry.go +++ b/pkg/query-service/telemetry/telemetry.go @@ -347,6 +347,7 @@ func createTelemetry() { "alertsWithTSV2": alertsInfo.AlertsWithTSV2, "logsBasedAlerts": alertsInfo.LogsBasedAlerts, "metricBasedAlerts": alertsInfo.MetricBasedAlerts, + "anomalyBasedAlerts": alertsInfo.AnomalyBasedAlerts, "tracesBasedAlerts": alertsInfo.TracesBasedAlerts, "totalChannels": alertsInfo.TotalChannels, "totalSavedViews": savedViewsInfo.TotalSavedViews, diff --git a/pkg/query-service/utils/logs.go b/pkg/query-service/utils/logs.go index 8efa026b52..5d26df6fb0 100644 --- a/pkg/query-service/utils/logs.go +++ b/pkg/query-service/utils/logs.go @@ -9,7 +9,7 @@ type LogsListTsRange struct { End int64 } -func GetLogsListTsRanges(start, end int64) []LogsListTsRange { +func GetListTsRanges(start, end int64) []LogsListTsRange { startNano := GetEpochNanoSecs(start) endNano := GetEpochNanoSecs(end) result := []LogsListTsRange{} @@ -35,13 +35,15 @@ func GetLogsListTsRanges(start, end int64) []LogsListTsRange { tStartNano = startNano } } + } else { + result = append(result, LogsListTsRange{Start: start, End: end}) } return result } // This tries to see all possible fields that it can fall back to if some meta is missing -// check Test_GenerateLogEnrichmentKeys for example -func GenerateLogEnrichmentKeys(field v3.AttributeKey) []string { +// check Test_GenerateEnrichmentKeys for example +func GenerateEnrichmentKeys(field v3.AttributeKey) []string { names := []string{} if field.Type != v3.AttributeKeyTypeUnspecified && field.DataType != v3.AttributeKeyDataTypeUnspecified { names = append(names, field.Key+"##"+field.Type.String()+"##"+field.DataType.String()) diff --git a/pkg/query-service/utils/logs_test.go b/pkg/query-service/utils/logs_test.go index e1efd813d1..1e95f98fca 100644 --- a/pkg/query-service/utils/logs_test.go +++ b/pkg/query-service/utils/logs_test.go @@ -7,7 +7,7 @@ import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) -func TestLogsListTsRange(t *testing.T) { +func TestListTsRange(t *testing.T) { startEndData := []struct { name string start int64 @@ -18,7 +18,7 @@ func TestLogsListTsRange(t *testing.T) { name: "testing for less then one hour", start: 1722262800000000000, // July 29, 2024 7:50:00 PM end: 1722263800000000000, // July 29, 2024 8:06:40 PM - res: []LogsListTsRange{}, + res: []LogsListTsRange{{1722262800000000000, 1722263800000000000}}, }, { name: "testing for more than one hour", @@ -44,7 +44,7 @@ func TestLogsListTsRange(t *testing.T) { } for _, test := range startEndData { - res := GetLogsListTsRanges(test.start, test.end) + res := GetListTsRanges(test.start, test.end) for i, v := range res { if test.res[i].Start != v.Start || test.res[i].End != v.End { t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End) @@ -53,7 +53,7 @@ func TestLogsListTsRange(t *testing.T) { } } -func Test_GenerateLogEnrichmentKeys(t *testing.T) { +func Test_GenerateEnrichmentKeys(t *testing.T) { type args struct { field v3.AttributeKey } @@ -96,8 +96,8 @@ func Test_GenerateLogEnrichmentKeys(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := GenerateLogEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) { - t.Errorf("generateLogEnrichmentKeys() = %v, want %v", got, tt.want) + if got := GenerateEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) { + t.Errorf("GenerateEnrichmentKeys() = %v, want %v", got, tt.want) } }) }