diff --git a/.vscode/settings.json b/.vscode/settings.json index 17f7235bdf6..db76e738b10 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -67,4 +67,5 @@ "python.testing.pytestArgs": ["tests"], "python.analysis.autoImportCompletions": true, "mypy-type-checker.args": ["--strict"] + } diff --git a/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml index 3a3e3e59752..a4658578358 100644 --- a/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml @@ -73,21 +73,6 @@ query_processors: time_parse_columns: - start_timestamp - end_timestamp - - processor: OptionalAttributeAggregationTransformer - args: - attribute_column_names: - - attr_num - aggregation_names: - - sum - - count - - avg - - avgWeighted - - max - - min - - uniq - curried_aggregation_names: - - quantile - - quantileTDigestWeighted - processor: HashBucketFunctionTransformer args: hash_bucket_names: diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index 3e4c80afcb7..6f7c79bc35e 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -1,11 +1,8 @@ from datetime import datetime, timedelta -from typing import Final, Mapping, Sequence, Set +from typing import Callable from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( - AttributeKey, - VirtualColumnContext, -) +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( ComparisonFilter, TraceItemFilter, @@ -82,185 +79,28 @@ def transform(exp: Expression) -> Expression: query.transform_expressions(transform) -# These are the columns which aren't stored in attr_str_ nor attr_num_ in clickhouse -NORMALIZED_COLUMNS: Final[Mapping[str, AttributeKey.Type.ValueType]] = { - "sentry.organization_id": AttributeKey.Type.TYPE_INT, - "sentry.project_id": AttributeKey.Type.TYPE_INT, - "sentry.service": AttributeKey.Type.TYPE_STRING, - "sentry.span_id": AttributeKey.Type.TYPE_STRING, # this is converted by a processor on the storage - "sentry.parent_span_id": AttributeKey.Type.TYPE_STRING, # this is converted by a processor on the storage - "sentry.segment_id": AttributeKey.Type.TYPE_STRING, # this is converted by a processor on the storage - "sentry.segment_name": AttributeKey.Type.TYPE_STRING, - "sentry.is_segment": AttributeKey.Type.TYPE_BOOLEAN, - "sentry.duration_ms": AttributeKey.Type.TYPE_DOUBLE, - "sentry.exclusive_time_ms": AttributeKey.Type.TYPE_DOUBLE, - "sentry.retention_days": AttributeKey.Type.TYPE_INT, - "sentry.name": AttributeKey.Type.TYPE_STRING, - "sentry.sampling_weight": AttributeKey.Type.TYPE_DOUBLE, - "sentry.sampling_factor": AttributeKey.Type.TYPE_DOUBLE, - "sentry.timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, - "sentry.start_timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, - "sentry.end_timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, -} - -TIMESTAMP_COLUMNS: Final[Set[str]] = { - "sentry.timestamp", - "sentry.start_timestamp", - "sentry.end_timestamp", -} - - -def attribute_key_to_expression(attr_key: AttributeKey) -> Expression: - def _build_label_mapping_key(attr_key: AttributeKey) -> str: - return attr_key.name + "_" + AttributeKey.Type.Name(attr_key.type) - - if attr_key.type == AttributeKey.Type.TYPE_UNSPECIFIED: - raise BadSnubaRPCRequestException( - f"attribute key {attr_key.name} must have a type specified" - ) - alias = _build_label_mapping_key(attr_key) - - if attr_key.name == "sentry.trace_id": - if attr_key.type == AttributeKey.Type.TYPE_STRING: - return f.CAST(column("trace_id"), "String", alias=alias) - raise BadSnubaRPCRequestException( - f"Attribute {attr_key.name} must be requested as a string, got {attr_key.type}" - ) - - if attr_key.name in TIMESTAMP_COLUMNS: - if attr_key.type == AttributeKey.Type.TYPE_STRING: - return f.CAST( - column(attr_key.name[len("sentry.") :]), "String", alias=alias - ) - if attr_key.type == AttributeKey.Type.TYPE_INT: - return f.CAST(column(attr_key.name[len("sentry.") :]), "Int64", alias=alias) - if ( - attr_key.type == AttributeKey.Type.TYPE_FLOAT - or attr_key.type == AttributeKey.Type.TYPE_DOUBLE - ): - return f.CAST( - column(attr_key.name[len("sentry.") :]), "Float64", alias=alias - ) - raise BadSnubaRPCRequestException( - f"Attribute {attr_key.name} must be requested as a string, float, or integer, got {attr_key.type}" - ) - - if attr_key.name in NORMALIZED_COLUMNS: - # the second if statement allows Sentry to send TYPE_FLOAT to Snuba when Snuba still has to be backward compatible with TYPE_FLOATS - if NORMALIZED_COLUMNS[attr_key.name] == attr_key.type or ( - attr_key.type == AttributeKey.Type.TYPE_FLOAT - and NORMALIZED_COLUMNS[attr_key.name] == AttributeKey.Type.TYPE_DOUBLE - ): - return column(attr_key.name[len("sentry.") :], alias=attr_key.name) - raise BadSnubaRPCRequestException( - f"Attribute {attr_key.name} must be requested as {NORMALIZED_COLUMNS[attr_key.name]}, got {attr_key.type}" - ) +def add_existence_check_to_subscriptable_references(query: Query) -> None: + def transform(exp: Expression) -> Expression: + if not isinstance(exp, SubscriptableReference): + return exp - # End of special handling, just send to the appropriate bucket - if attr_key.type == AttributeKey.Type.TYPE_STRING: - return SubscriptableReference( - alias=alias, column=column("attr_str"), key=literal(attr_key.name) - ) - if ( - attr_key.type == AttributeKey.Type.TYPE_FLOAT - or attr_key.type == AttributeKey.Type.TYPE_DOUBLE - ): - return SubscriptableReference( - alias=alias, column=column("attr_num"), key=literal(attr_key.name) - ) - if attr_key.type == AttributeKey.Type.TYPE_INT: - return f.CAST( - SubscriptableReference( - alias=None, column=column("attr_num"), key=literal(attr_key.name) - ), - "Int64", - alias=alias, - ) - if attr_key.type == AttributeKey.Type.TYPE_BOOLEAN: - return f.CAST( - SubscriptableReference( - alias=None, - column=column("attr_num"), - key=literal(attr_key.name), + return FunctionCall( + alias=exp.alias, + function_name="if", + parameters=( + f.mapContains(exp.column, exp.key), + SubscriptableReference(None, exp.column, exp.key), + literal(None), ), - "Boolean", - alias=alias, ) - raise BadSnubaRPCRequestException( - f"Attribute {attr_key.name} had an unknown or unset type: {attr_key.type}" - ) - - -def apply_virtual_columns( - query: Query, virtual_column_contexts: Sequence[VirtualColumnContext] -) -> None: - """Injects virtual column mappings into the clickhouse query. Works with NORMALIZED_COLUMNS on the table or - dynamic columns in attr_str - - attr_num not supported because mapping on floats is a bad idea - - Example: - - SELECT - project_name AS `project_name`, - attr_str['release'] AS `release`, - attr_str['sentry.sdk.name'] AS `sentry.sdk.name`, - ... rest of query - - contexts: - [ {from_column_name: project_id, to_column_name: project_name, value_map: {1: "sentry", 2: "snuba"}} ] - - - Query will be transformed into: - - SELECT - -- see the project name column transformed and the value mapping injected - transform( CAST( project_id, 'String'), array( '1', '2'), array( 'sentry', 'snuba'), 'unknown') AS `project_name`, - -- - attr_str['release'] AS `release`, - attr_str['sentry.sdk.name'] AS `sentry.sdk.name`, - ... rest of query - """ - - if not virtual_column_contexts: - return - - mapped_column_to_context = {c.to_column_name: c for c in virtual_column_contexts} - - def transform_expressions(expression: Expression) -> Expression: - # virtual columns will show up as `attr_str[virtual_column_name]` or `attr_num[virtual_column_name]` - if not isinstance(expression, SubscriptableReference): - return expression - - if expression.column.column_name != "attr_str": - return expression - context = mapped_column_to_context.get(str(expression.key.value)) - if context: - attribute_expression = attribute_key_to_expression( - AttributeKey( - name=context.from_column_name, - type=NORMALIZED_COLUMNS.get( - context.from_column_name, AttributeKey.TYPE_STRING - ), - ) - ) - return f.transform( - f.CAST(attribute_expression, "String"), - literals_array(None, [literal(k) for k in context.value_map.keys()]), - literals_array(None, [literal(v) for v in context.value_map.values()]), - literal( - context.default_value if context.default_value != "" else "unknown" - ), - alias=context.to_column_name, - ) - - return expression - - query.transform_expressions(transform_expressions) + query.transform_expressions(transform) -def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression: +def trace_item_filters_to_expression( + item_filter: TraceItemFilter, + attribute_key_to_expression: Callable[[AttributeKey], Expression], +) -> Expression: """ Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024") This maps those filters into an expression which can be used in a WHERE clause @@ -271,9 +111,16 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression filters = item_filter.and_filter.filters if len(filters) == 0: return literal(True) - if len(filters) == 1: - return trace_item_filters_to_expression(filters[0]) - return and_cond(*(trace_item_filters_to_expression(x) for x in filters)) + elif len(filters) == 1: + return trace_item_filters_to_expression( + filters[0], attribute_key_to_expression + ) + return and_cond( + *( + trace_item_filters_to_expression(x, attribute_key_to_expression) + for x in filters + ) + ) if item_filter.HasField("or_filter"): filters = item_filter.or_filter.filters @@ -281,9 +128,16 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression raise BadSnubaRPCRequestException( "Invalid trace item filter, empty 'or' clause" ) - if len(filters) == 1: - return trace_item_filters_to_expression(filters[0]) - return or_cond(*(trace_item_filters_to_expression(x) for x in filters)) + elif len(filters) == 1: + return trace_item_filters_to_expression( + filters[0], attribute_key_to_expression + ) + return or_cond( + *( + trace_item_filters_to_expression(x, attribute_key_to_expression) + for x in filters + ) + ) if item_filter.HasField("not_filter"): filters = item_filter.not_filter.filters @@ -292,9 +146,39 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression "Invalid trace item filter, empty 'not' clause" ) elif len(filters) == 1: - return not_cond(trace_item_filters_to_expression(filters[0])) + return not_cond( + trace_item_filters_to_expression( + filters[0], attribute_key_to_expression + ) + ) + return not_cond( + and_cond( + *( + trace_item_filters_to_expression(x, attribute_key_to_expression) + for x in filters + ) + ) + ) + + if item_filter.HasField("not_filter"): + filters = item_filter.not_filter.filters + if len(filters) == 0: + raise BadSnubaRPCRequestException( + "Invalid trace item filter, empty 'not' clause" + ) + elif len(filters) == 1: + return not_cond( + trace_item_filters_to_expression( + filters[0], attribute_key_to_expression + ) + ) return not_cond( - and_cond(*(trace_item_filters_to_expression(x) for x in filters)) + and_cond( + *( + trace_item_filters_to_expression(x, attribute_key_to_expression) + for x in filters + ) + ) ) if item_filter.HasField("comparison_filter"): @@ -309,39 +193,40 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression "comparison does not have a right hand side" ) - match value_type: - case "val_bool": - v_expression: Expression = literal(v.val_bool) - case "val_str": - v_expression = literal(v.val_str) - case "val_float": - v_expression = literal(v.val_float) - case "val_double": - v_expression = literal(v.val_double) - case "val_int": - v_expression = literal(v.val_int) - case "val_null": - v_expression = literal(None) - case "val_str_array": - v_expression = literals_array( - None, list(map(lambda x: literal(x), v.val_str_array.values)) - ) - case "val_int_array": - v_expression = literals_array( - None, list(map(lambda x: literal(x), v.val_int_array.values)) - ) - case "val_float_array": - v_expression = literals_array( - None, list(map(lambda x: literal(x), v.val_float_array.values)) - ) - case "val_double_array": - v_expression = literals_array( - None, list(map(lambda x: literal(x), v.val_double_array.values)) - ) - case default: - raise NotImplementedError( - f"translation of AttributeValue type {default} is not implemented" - ) + if v.is_null: + v_expression: Expression = literal(None) + else: + match value_type: + case "val_bool": + v_expression = literal(v.val_bool) + case "val_str": + v_expression = literal(v.val_str) + case "val_float": + v_expression = literal(v.val_float) + case "val_double": + v_expression = literal(v.val_double) + case "val_int": + v_expression = literal(v.val_int) + case "val_str_array": + v_expression = literals_array( + None, list(map(lambda x: literal(x), v.val_str_array.values)) + ) + case "val_int_array": + v_expression = literals_array( + None, list(map(lambda x: literal(x), v.val_int_array.values)) + ) + case "val_float_array": + v_expression = literals_array( + None, list(map(lambda x: literal(x), v.val_float_array.values)) + ) + case "val_double_array": + v_expression = literals_array( + None, list(map(lambda x: literal(x), v.val_double_array.values)) + ) + case default: + raise NotImplementedError( + f"translation of AttributeValue type {default} is not implemented" + ) if op == ComparisonFilter.OP_EQUALS: _check_non_string_values_cannot_ignore_case(item_filter.comparison_filter) @@ -401,13 +286,9 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression ) if item_filter.HasField("exists_filter"): - k = item_filter.exists_filter.key - if k.name in NORMALIZED_COLUMNS.keys(): - return f.isNotNull(attribute_key_to_expression(k)) - if k.type == AttributeKey.Type.TYPE_STRING: - return f.mapContains(column("attr_str"), literal(k.name)) - else: - return f.mapContains(column("attr_num"), literal(k.name)) + return get_field_existence_expression( + attribute_key_to_expression(item_filter.exists_filter.key) + ) return literal(True) @@ -467,3 +348,29 @@ def convert_filter_offset(filter_offset: TraceItemFilter) -> Expression: raise BadSnubaRPCRequestException("please provide a string for filter offset") return f.greater(k_expression, literal(v.val_str)) + + +def get_field_existence_expression(field: Expression) -> Expression: + def get_subscriptable_field(field: Expression) -> SubscriptableReference | None: + """ + Check if the field is a subscriptable reference or a function call with a subscriptable reference as the first parameter to handle the case + where the field is casting a subscriptable reference (e.g. for integers). If so, return the subscriptable reference. + """ + if isinstance(field, SubscriptableReference): + return field + elif isinstance(field, FunctionCall) and len(field.parameters) > 0: + if len(field.parameters) > 0 and isinstance( + field.parameters[0], SubscriptableReference + ): + return field.parameters[0] + + return None + + subscriptable_field = get_subscriptable_field(field) + if subscriptable_field is not None: + return f.mapContains(subscriptable_field.column, subscriptable_field.key) + + if isinstance(field, FunctionCall) and field.function_name == "arrayElement": + return f.mapContains(field.parameters[0], field.parameters[1]) + + return f.isNotNull(field) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index 2caf298789e..e6811893334 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -29,7 +29,6 @@ from snuba.web.query import run_query from snuba.web.rpc import RPCEndpoint from snuba.web.rpc.common.common import ( - attribute_key_to_expression, base_conditions_and, project_id_and_org_conditions, timestamp_in_range_condition, @@ -41,6 +40,9 @@ setup_trace_query_settings, ) from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException +from snuba.web.rpc.v1.resolvers.R_eap_spans.common.common import ( + attribute_key_to_expression, +) _DEFAULT_ROW_LIMIT = 10_000 _BUFFER_WINDOW = 2 * 3600 # 2 hours @@ -350,6 +352,7 @@ def _list_trace_ids( ) -> dict[str, int]: trace_item_filters_expression = trace_item_filters_to_expression( _select_supported_filters(request.filters), + attribute_key_to_expression, ) selected_columns: list[SelectedExpression] = [ SelectedExpression( @@ -410,6 +413,7 @@ def _get_metadata_for_traces( ) -> list[GetTracesResponse.Trace]: trace_item_filters_expression = trace_item_filters_to_expression( _select_supported_filters(request.filters), + attribute_key_to_expression, ) selected_columns: list[SelectedExpression] = [] diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/common.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/common.py new file mode 100644 index 00000000000..702d803daf8 --- /dev/null +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/common.py @@ -0,0 +1,189 @@ +from typing import Final, Mapping, Sequence, Set + +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeKey, + VirtualColumnContext, +) + +from snuba.query import Query +from snuba.query.dsl import Functions as f +from snuba.query.dsl import column, literal, literals_array +from snuba.query.expressions import Expression, SubscriptableReference +from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException + +# These are the columns which aren't stored in attr_str_ nor attr_num_ in clickhouse +NORMALIZED_COLUMNS: Final[Mapping[str, AttributeKey.Type.ValueType]] = { + "sentry.organization_id": AttributeKey.Type.TYPE_INT, + "sentry.project_id": AttributeKey.Type.TYPE_INT, + "sentry.service": AttributeKey.Type.TYPE_STRING, + "sentry.span_id": AttributeKey.Type.TYPE_STRING, # this is converted by a processor on the storage + "sentry.parent_span_id": AttributeKey.Type.TYPE_STRING, # this is converted by a processor on the storage + "sentry.segment_id": AttributeKey.Type.TYPE_STRING, # this is converted by a processor on the storage + "sentry.segment_name": AttributeKey.Type.TYPE_STRING, + "sentry.is_segment": AttributeKey.Type.TYPE_BOOLEAN, + "sentry.duration_ms": AttributeKey.Type.TYPE_DOUBLE, + "sentry.exclusive_time_ms": AttributeKey.Type.TYPE_DOUBLE, + "sentry.retention_days": AttributeKey.Type.TYPE_INT, + "sentry.name": AttributeKey.Type.TYPE_STRING, + "sentry.sampling_weight": AttributeKey.Type.TYPE_DOUBLE, + "sentry.sampling_factor": AttributeKey.Type.TYPE_DOUBLE, + "sentry.timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, + "sentry.start_timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, + "sentry.end_timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, +} + +TIMESTAMP_COLUMNS: Final[Set[str]] = { + "sentry.timestamp", + "sentry.start_timestamp", + "sentry.end_timestamp", +} + + +def attribute_key_to_expression(attr_key: AttributeKey) -> Expression: + def _build_label_mapping_key(attr_key: AttributeKey) -> str: + return attr_key.name + "_" + AttributeKey.Type.Name(attr_key.type) + + if attr_key.type == AttributeKey.Type.TYPE_UNSPECIFIED: + raise BadSnubaRPCRequestException( + f"attribute key {attr_key.name} must have a type specified" + ) + alias = _build_label_mapping_key(attr_key) + + if attr_key.name == "sentry.trace_id": + if attr_key.type == AttributeKey.Type.TYPE_STRING: + return f.CAST(column("trace_id"), "String", alias=alias) + raise BadSnubaRPCRequestException( + f"Attribute {attr_key.name} must be requested as a string, got {attr_key.type}" + ) + + if attr_key.name in TIMESTAMP_COLUMNS: + if attr_key.type == AttributeKey.Type.TYPE_STRING: + return f.CAST( + column(attr_key.name[len("sentry.") :]), "String", alias=alias + ) + if attr_key.type == AttributeKey.Type.TYPE_INT: + return f.CAST(column(attr_key.name[len("sentry.") :]), "Int64", alias=alias) + if ( + attr_key.type == AttributeKey.Type.TYPE_FLOAT + or attr_key.type == AttributeKey.Type.TYPE_DOUBLE + ): + return f.CAST( + column(attr_key.name[len("sentry.") :]), "Float64", alias=alias + ) + raise BadSnubaRPCRequestException( + f"Attribute {attr_key.name} must be requested as a string, float, or integer, got {attr_key.type}" + ) + + if attr_key.name in NORMALIZED_COLUMNS: + # the second if statement allows Sentry to send TYPE_FLOAT to Snuba when Snuba still has to be backward compatible with TYPE_FLOATS + if NORMALIZED_COLUMNS[attr_key.name] == attr_key.type or ( + attr_key.type == AttributeKey.Type.TYPE_FLOAT + and NORMALIZED_COLUMNS[attr_key.name] == AttributeKey.Type.TYPE_DOUBLE + ): + return column(attr_key.name[len("sentry.") :], alias=attr_key.name) + raise BadSnubaRPCRequestException( + f"Attribute {attr_key.name} must be requested as {NORMALIZED_COLUMNS[attr_key.name]}, got {attr_key.type}" + ) + + # End of special handling, just send to the appropriate bucket + if attr_key.type == AttributeKey.Type.TYPE_STRING: + return SubscriptableReference( + alias=alias, column=column("attr_str"), key=literal(attr_key.name) + ) + if ( + attr_key.type == AttributeKey.Type.TYPE_FLOAT + or attr_key.type == AttributeKey.Type.TYPE_DOUBLE + ): + return SubscriptableReference( + alias=alias, column=column("attr_num"), key=literal(attr_key.name) + ) + if attr_key.type == AttributeKey.Type.TYPE_INT: + return f.CAST( + SubscriptableReference( + alias=None, column=column("attr_num"), key=literal(attr_key.name) + ), + "Nullable(Int64)", + alias=alias, + ) + if attr_key.type == AttributeKey.Type.TYPE_BOOLEAN: + return f.CAST( + SubscriptableReference( + alias=None, + column=column("attr_num"), + key=literal(attr_key.name), + ), + "Nullable(Boolean)", + alias=alias, + ) + raise BadSnubaRPCRequestException( + f"Attribute {attr_key.name} had an unknown or unset type: {attr_key.type}" + ) + + +def apply_virtual_columns( + query: Query, virtual_column_contexts: Sequence[VirtualColumnContext] +) -> None: + """Injects virtual column mappings into the clickhouse query. Works with NORMALIZED_COLUMNS on the table or + dynamic columns in attr_str + + attr_num not supported because mapping on floats is a bad idea + + Example: + + SELECT + project_name AS `project_name`, + attr_str['release'] AS `release`, + attr_str['sentry.sdk.name'] AS `sentry.sdk.name`, + ... rest of query + + contexts: + [ {from_column_name: project_id, to_column_name: project_name, value_map: {1: "sentry", 2: "snuba"}} ] + + + Query will be transformed into: + + SELECT + -- see the project name column transformed and the value mapping injected + transform( CAST( project_id, 'String'), array( '1', '2'), array( 'sentry', 'snuba'), 'unknown') AS `project_name`, + -- + attr_str['release'] AS `release`, + attr_str['sentry.sdk.name'] AS `sentry.sdk.name`, + ... rest of query + + """ + + if not virtual_column_contexts: + return + + mapped_column_to_context = {c.to_column_name: c for c in virtual_column_contexts} + + def transform_expressions(expression: Expression) -> Expression: + # virtual columns will show up as `attr_str[virtual_column_name]` or `attr_num[virtual_column_name]` + if not isinstance(expression, SubscriptableReference): + return expression + + if expression.column.column_name != "attr_str": + return expression + context = mapped_column_to_context.get(str(expression.key.value)) + if context: + attribute_expression = attribute_key_to_expression( + AttributeKey( + name=context.from_column_name, + type=NORMALIZED_COLUMNS.get( + context.from_column_name, AttributeKey.TYPE_STRING + ), + ) + ) + return f.transform( + f.CAST(attribute_expression, "String"), + literals_array(None, [literal(k) for k in context.value_map.keys()]), + literals_array(None, [literal(v) for v in context.value_map.values()]), + literal( + context.default_value if context.default_value != "" else "unknown" + ), + alias=context.to_column_name, + ) + + return expression + + query.transform_expressions(transform_expressions) diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_get_trace.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_get_trace.py index ac1b38ce298..b8c05a41f41 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_get_trace.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_get_trace.py @@ -27,7 +27,6 @@ from snuba.request import Request as SnubaRequest from snuba.web.query import run_query from snuba.web.rpc.common.common import ( - attribute_key_to_expression, project_id_and_org_conditions, timestamp_in_range_condition, treeify_or_and_conditions, @@ -38,6 +37,9 @@ ) from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException from snuba.web.rpc.v1.resolvers import ResolverGetTrace +from snuba.web.rpc.v1.resolvers.R_eap_spans.common.common import ( + attribute_key_to_expression, +) _BUCKET_COUNT = 20 diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py index 88a01321d4c..eaf0139a7ae 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py @@ -28,7 +28,6 @@ from snuba.request import Request as SnubaRequest from snuba.web.query import run_query from snuba.web.rpc.common.common import ( - attribute_key_to_expression, base_conditions_and, trace_item_filters_to_expression, treeify_or_and_conditions, @@ -37,15 +36,17 @@ extract_response_meta, setup_trace_query_settings, ) -from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException from snuba.web.rpc.v1.resolvers import ResolverTimeSeries -from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import ( +from snuba.web.rpc.v1.resolvers.common.aggregation import ( ExtrapolationContext, aggregation_to_expression, get_average_sample_rate_column, get_confidence_interval_column, get_count_column, ) +from snuba.web.rpc.v1.resolvers.R_eap_spans.common.common import ( + attribute_key_to_expression, +) def _convert_result_timeseries( @@ -254,7 +255,10 @@ def _build_query(request: TimeSeriesRequest) -> Query: ], granularity=request.granularity_secs, condition=base_conditions_and( - request.meta, trace_item_filters_to_expression(request.filter) + request.meta, + trace_item_filters_to_expression( + request.filter, attribute_key_to_expression + ), ), groupby=[ column("time_slot"), @@ -298,9 +302,6 @@ def trace_item_type(cls) -> TraceItemType.ValueType: return TraceItemType.TRACE_ITEM_TYPE_SPAN def resolve(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse: - if len(in_msg.expressions) > 0: - raise BadSnubaRPCRequestException("expressions field not yet implemented") - snuba_request = _build_snuba_request(in_msg) res = run_query( dataset=PluggableDataset(name="eap", all_entities=[]), diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_stats.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_stats.py index 5305d850bb1..04356c935e0 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_stats.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_stats.py @@ -39,6 +39,9 @@ from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException from snuba.web.rpc.v1.endpoint_get_traces import _DEFAULT_ROW_LIMIT from snuba.web.rpc.v1.resolvers import ResolverTraceItemStats +from snuba.web.rpc.v1.resolvers.R_eap_spans.common.common import ( + attribute_key_to_expression, +) MAX_BUCKETS = 100 DEFAULT_BUCKETS = 10 @@ -131,7 +134,9 @@ def _build_attr_distribution_query( ), ] - trace_item_filters_expression = trace_item_filters_to_expression(in_msg.filter) + trace_item_filters_expression = trace_item_filters_to_expression( + in_msg.filter, attribute_key_to_expression + ) query = Query( from_clause=entity, diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py index 87635b7c249..46a31a335af 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py @@ -1,7 +1,6 @@ import uuid -from collections import defaultdict from dataclasses import replace -from typing import Any, Callable, Dict, Iterable, Sequence +from typing import Sequence from google.protobuf.json_format import MessageToDict from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( @@ -13,11 +12,7 @@ TraceItemTableResponse, ) from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( - AttributeKey, - AttributeValue, - ExtrapolationMode, -) +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ExtrapolationMode from snuba.attribution.appid import AppID from snuba.attribution.attribution_info import AttributionInfo @@ -34,8 +29,7 @@ from snuba.request import Request as SnubaRequest from snuba.web.query import run_query from snuba.web.rpc.common.common import ( - apply_virtual_columns, - attribute_key_to_expression, + add_existence_check_to_subscriptable_references, base_conditions_and, trace_item_filters_to_expression, treeify_or_and_conditions, @@ -46,13 +40,17 @@ ) from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable -from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import ( - ExtrapolationContext, +from snuba.web.rpc.v1.resolvers.common.aggregation import ( aggregation_to_expression, get_average_sample_rate_column, get_confidence_interval_column, get_count_column, ) +from snuba.web.rpc.v1.resolvers.common.trace_item_table import convert_results +from snuba.web.rpc.v1.resolvers.R_eap_spans.common.common import ( + apply_virtual_columns, + attribute_key_to_expression, +) _DEFAULT_ROW_LIMIT = 10_000 @@ -233,7 +231,9 @@ def _build_query(request: TraceItemTableRequest) -> Query: selected_columns=selected_columns, condition=base_conditions_and( request.meta, - trace_item_filters_to_expression(request.filter), + trace_item_filters_to_expression( + request.filter, attribute_key_to_expression + ), ), order_by=_convert_order_by(request.order_by), groupby=[ @@ -250,6 +250,7 @@ def _build_query(request: TraceItemTableRequest) -> Query: ) treeify_or_and_conditions(res) apply_virtual_columns(res, request.virtual_column_contexts) + add_existence_check_to_subscriptable_references(res) return res @@ -277,54 +278,6 @@ def _build_snuba_request(request: TraceItemTableRequest) -> SnubaRequest: ) -def _convert_results( - request: TraceItemTableRequest, data: Iterable[Dict[str, Any]] -) -> list[TraceItemColumnValues]: - converters: Dict[str, Callable[[Any], AttributeValue]] = {} - - for column in request.columns: - if column.HasField("key"): - if column.key.type == AttributeKey.TYPE_BOOLEAN: - converters[column.label] = lambda x: AttributeValue(val_bool=bool(x)) - elif column.key.type == AttributeKey.TYPE_STRING: - converters[column.label] = lambda x: AttributeValue(val_str=str(x)) - elif column.key.type == AttributeKey.TYPE_INT: - converters[column.label] = lambda x: AttributeValue(val_int=int(x)) - elif column.key.type == AttributeKey.TYPE_FLOAT: - converters[column.label] = lambda x: AttributeValue(val_float=float(x)) - elif column.key.type == AttributeKey.TYPE_DOUBLE: - converters[column.label] = lambda x: AttributeValue(val_double=float(x)) - elif column.HasField("aggregation"): - converters[column.label] = lambda x: AttributeValue(val_double=float(x)) - elif column.HasField("formula"): - converters[column.label] = lambda x: AttributeValue(val_double=float(x)) - else: - raise BadSnubaRPCRequestException( - "column is not one of: attribute, aggregation, or formula" - ) - - res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues) - for row in data: - for column_name, value in row.items(): - if column_name in converters.keys(): - res[column_name].results.append(converters[column_name](value)) - res[column_name].attribute_name = column_name - extrapolation_context = ExtrapolationContext.from_row(column_name, row) - if extrapolation_context.is_extrapolated: - res[column_name].reliabilities.append( - extrapolation_context.reliability - ) - - column_ordering = {column.label: i for i, column in enumerate(request.columns)} - - return list( - # we return the columns in the order they were requested - sorted( - res.values(), key=lambda c: column_ordering.__getitem__(c.attribute_name) - ) - ) - - def _get_page_token( request: TraceItemTableRequest, response: list[TraceItemColumnValues] ) -> PageToken: @@ -346,7 +299,7 @@ def resolve(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse: request=snuba_request, timer=self._timer, ) - column_values = _convert_results(in_msg, res.result.get("data", [])) + column_values = convert_results(in_msg, res.result.get("data", [])) response_meta = extract_response_meta( in_msg.meta.request_id, in_msg.meta.debug, diff --git a/snuba/web/rpc/v1/resolvers/R_ourlogs/common/trace_item_filters_to_expression.py b/snuba/web/rpc/v1/resolvers/R_ourlogs/common/trace_item_filters_to_expression.py deleted file mode 100644 index e6e1ba4bae7..00000000000 --- a/snuba/web/rpc/v1/resolvers/R_ourlogs/common/trace_item_filters_to_expression.py +++ /dev/null @@ -1,158 +0,0 @@ -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey -from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( - ComparisonFilter, - TraceItemFilter, -) - -from snuba.query.dsl import Functions as f -from snuba.query.dsl import ( - and_cond, - column, - in_cond, - literal, - literals_array, - not_cond, - or_cond, -) -from snuba.query.expressions import Expression -from snuba.web.rpc.common.common import _check_non_string_values_cannot_ignore_case -from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException -from snuba.web.rpc.v1.resolvers.R_ourlogs.common.attribute_key_to_expression import ( - attribute_key_to_expression, -) - - -def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression: - """ - Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024") - This maps those filters into an expression which can be used in a WHERE clause - :param item_filter: - :return: - """ - if item_filter.HasField("and_filter"): - filters = item_filter.and_filter.filters - if len(filters) == 0: - return literal(True) - if len(filters) == 1: - return trace_item_filters_to_expression(filters[0]) - return and_cond(*(trace_item_filters_to_expression(x) for x in filters)) - - if item_filter.HasField("or_filter"): - filters = item_filter.or_filter.filters - if len(filters) == 0: - raise BadSnubaRPCRequestException( - "Invalid trace item filter, empty 'or' clause" - ) - if len(filters) == 1: - return trace_item_filters_to_expression(filters[0]) - return or_cond(*(trace_item_filters_to_expression(x) for x in filters)) - - if item_filter.HasField("comparison_filter"): - k = item_filter.comparison_filter.key - k_expression = attribute_key_to_expression(k) - op = item_filter.comparison_filter.op - v = item_filter.comparison_filter.value - - value_type = v.WhichOneof("value") - if value_type is None: - raise BadSnubaRPCRequestException( - "comparison does not have a right hand side" - ) - - match value_type: - case "val_bool": - v_expression: Expression = literal(v.val_bool) - case "val_str": - v_expression = literal(v.val_str) - case "val_float": - v_expression = literal(v.val_float) - case "val_int": - v_expression = literal(v.val_int) - case "val_null": - v_expression = literal(None) - case "val_str_array": - v_expression = literals_array( - None, list(map(lambda x: literal(x), v.val_str_array.values)) - ) - case "val_int_array": - v_expression = literals_array( - None, list(map(lambda x: literal(x), v.val_int_array.values)) - ) - case "val_float_array": - v_expression = literals_array( - None, list(map(lambda x: literal(x), v.val_float_array.values)) - ) - case default: - raise NotImplementedError( - f"translation of AttributeValue type {default} is not implemented" - ) - - if op == ComparisonFilter.OP_EQUALS: - _check_non_string_values_cannot_ignore_case(item_filter.comparison_filter) - return ( - f.equals(f.lower(k_expression), f.lower(v_expression)) - if item_filter.comparison_filter.ignore_case - else f.equals(k_expression, v_expression) - ) - if op == ComparisonFilter.OP_NOT_EQUALS: - _check_non_string_values_cannot_ignore_case(item_filter.comparison_filter) - return ( - f.notEquals(f.lower(k_expression), f.lower(v_expression)) - if item_filter.comparison_filter.ignore_case - else f.notEquals(k_expression, v_expression) - ) - if op == ComparisonFilter.OP_LIKE: - if k.type != AttributeKey.Type.TYPE_STRING: - raise BadSnubaRPCRequestException( - "the LIKE comparison is only supported on string keys" - ) - return f.like(k_expression, v_expression) - if op == ComparisonFilter.OP_NOT_LIKE: - if k.type != AttributeKey.Type.TYPE_STRING: - raise BadSnubaRPCRequestException( - "the NOT LIKE comparison is only supported on string keys" - ) - return f.notLike(k_expression, v_expression) - if op == ComparisonFilter.OP_LESS_THAN: - return f.less(k_expression, v_expression) - if op == ComparisonFilter.OP_LESS_THAN_OR_EQUALS: - return f.lessOrEquals(k_expression, v_expression) - if op == ComparisonFilter.OP_GREATER_THAN: - return f.greater(k_expression, v_expression) - if op == ComparisonFilter.OP_GREATER_THAN_OR_EQUALS: - return f.greaterOrEquals(k_expression, v_expression) - if op == ComparisonFilter.OP_IN: - _check_non_string_values_cannot_ignore_case(item_filter.comparison_filter) - if item_filter.comparison_filter.ignore_case: - k_expression = f.lower(k_expression) - v_expression = literals_array( - None, - list(map(lambda x: literal(x.lower()), v.val_str_array.values)), - ) - return in_cond(k_expression, v_expression) - if op == ComparisonFilter.OP_NOT_IN: - _check_non_string_values_cannot_ignore_case(item_filter.comparison_filter) - if item_filter.comparison_filter.ignore_case: - k_expression = f.lower(k_expression) - v_expression = literals_array( - None, - list(map(lambda x: literal(x.lower()), v.val_str_array.values)), - ) - return not_cond(in_cond(k_expression, v_expression)) - - raise BadSnubaRPCRequestException( - f"Invalid string comparison, unknown op: {item_filter.comparison_filter}" - ) - - if item_filter.HasField("exists_filter"): - k = item_filter.exists_filter.key - if k.type == AttributeKey.Type.TYPE_STRING: - return f.mapContains(column("attr_string"), literal(k.name)) - elif k.type == AttributeKey.Type.TYPE_INT: - return f.mapContains(column("attr_int"), literal(k.name)) - elif k.type == AttributeKey.Type.TYPE_BOOLEAN: - return f.mapContains(column("attr_bool"), literal(k.name)) - elif k.type == AttributeKey.Type.TYPE_FLOAT: - return f.mapContains(column("attr_double"), literal(k.name)) - - return literal(True) diff --git a/snuba/web/rpc/v1/resolvers/R_ourlogs/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_ourlogs/resolver_trace_item_table.py index 1f8d32049d0..6a2e4ff0d83 100644 --- a/snuba/web/rpc/v1/resolvers/R_ourlogs/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_ourlogs/resolver_trace_item_table.py @@ -1,6 +1,5 @@ import uuid -from collections import defaultdict -from typing import Any, Callable, Dict, Iterable, Sequence +from typing import Sequence from google.protobuf.json_format import MessageToDict from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( @@ -9,7 +8,6 @@ TraceItemTableResponse, ) from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue from snuba.attribution.appid import AppID from snuba.attribution.attribution_info import AttributionInfo @@ -22,19 +20,22 @@ from snuba.query.query_settings import HTTPQuerySettings from snuba.request import Request as SnubaRequest from snuba.web.query import run_query -from snuba.web.rpc.common.common import base_conditions_and, treeify_or_and_conditions +from snuba.web.rpc.common.common import ( + add_existence_check_to_subscriptable_references, + base_conditions_and, + trace_item_filters_to_expression, + treeify_or_and_conditions, +) from snuba.web.rpc.common.debug_info import ( extract_response_meta, setup_trace_query_settings, ) from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable +from snuba.web.rpc.v1.resolvers.common.trace_item_table import convert_results from snuba.web.rpc.v1.resolvers.R_ourlogs.common.attribute_key_to_expression import ( attribute_key_to_expression, ) -from snuba.web.rpc.v1.resolvers.R_ourlogs.common.trace_item_filters_to_expression import ( - trace_item_filters_to_expression, -) _DEFAULT_ROW_LIMIT = 10_000 @@ -86,7 +87,9 @@ def _build_query(request: TraceItemTableRequest) -> Query: selected_columns=selected_columns, condition=base_conditions_and( request.meta, - trace_item_filters_to_expression(request.filter), + trace_item_filters_to_expression( + request.filter, attribute_key_to_expression + ), ), order_by=_convert_order_by(request.order_by), groupby=[ @@ -97,6 +100,7 @@ def _build_query(request: TraceItemTableRequest) -> Query: limit=request.limit if request.limit > 0 else _DEFAULT_ROW_LIMIT, ) treeify_or_and_conditions(res) + add_existence_check_to_subscriptable_references(res) return res @@ -124,44 +128,6 @@ def _build_snuba_request(request: TraceItemTableRequest) -> SnubaRequest: ) -def _convert_results( - request: TraceItemTableRequest, data: Iterable[Dict[str, Any]] -) -> list[TraceItemColumnValues]: - converters: Dict[str, Callable[[Any], AttributeValue]] = {} - - for column in request.columns: - if column.HasField("key"): - if column.key.type == AttributeKey.TYPE_BOOLEAN: - converters[column.label] = lambda x: AttributeValue(val_bool=bool(x)) - elif column.key.type == AttributeKey.TYPE_STRING: - converters[column.label] = lambda x: AttributeValue(val_str=str(x)) - elif column.key.type == AttributeKey.TYPE_INT: - converters[column.label] = lambda x: AttributeValue(val_int=int(x)) - elif ( - column.key.type == AttributeKey.TYPE_FLOAT - or column.key.type == AttributeKey.Type.TYPE_DOUBLE - ): - converters[column.label] = lambda x: AttributeValue(val_float=float(x)) - else: - raise BadSnubaRPCRequestException("column is not an attribute") - - res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues) - for row in data: - for column_name, value in row.items(): - if column_name in converters.keys(): - res[column_name].results.append(converters[column_name](value)) - res[column_name].attribute_name = column_name - - column_ordering = {column.label: i for i, column in enumerate(request.columns)} - - return list( - # we return the columns in the order they were requested - sorted( - res.values(), key=lambda c: column_ordering.__getitem__(c.attribute_name) - ) - ) - - def _get_page_token( request: TraceItemTableRequest, response: list[TraceItemColumnValues] ) -> PageToken: @@ -183,7 +149,7 @@ def resolve(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse: request=snuba_request, timer=self._timer, ) - column_values = _convert_results(in_msg, res.result.get("data", [])) + column_values = convert_results(in_msg, res.result.get("data", [])) response_meta = extract_response_meta( in_msg.meta.request_id, in_msg.meta.debug, diff --git a/snuba/web/rpc/v1/resolvers/R_uptime_checks/common/aggregation.py b/snuba/web/rpc/v1/resolvers/R_uptime_checks/common/aggregation.py deleted file mode 100644 index 72606a5e097..00000000000 --- a/snuba/web/rpc/v1/resolvers/R_uptime_checks/common/aggregation.py +++ /dev/null @@ -1,107 +0,0 @@ -from __future__ import annotations - -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( - AttributeAggregation, - Function, -) - -from snuba.query.dsl import CurriedFunctions as cf -from snuba.query.dsl import Functions as f -from snuba.query.expressions import ( - CurriedFunctionCall, - Expression, - FunctionCall, - SubscriptableReference, -) -from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException -from snuba.web.rpc.v1.resolvers.R_uptime_checks.common.common import ( - attribute_key_to_expression, -) - -_FLOATING_POINT_PRECISION = 9 - - -def get_field_existence_expression(aggregation: AttributeAggregation) -> Expression: - def get_subscriptable_field(field: Expression) -> SubscriptableReference | None: - """ - Check if the field is a subscriptable reference or a function call with a subscriptable reference as the first parameter to handle the case - where the field is casting a subscriptable reference (e.g. for integers). If so, return the subscriptable reference. - """ - if isinstance(field, SubscriptableReference): - return field - if isinstance(field, FunctionCall) and len(field.parameters) > 0: - if len(field.parameters) > 0 and isinstance( - field.parameters[0], SubscriptableReference - ): - return field.parameters[0] - - return None - - field = attribute_key_to_expression(aggregation.key) - subscriptable_field = get_subscriptable_field(field) - if subscriptable_field is not None: - return f.mapContains(subscriptable_field.column, subscriptable_field.key) - - return f.isNotNull(field) - - -def get_count_column(aggregation: AttributeAggregation) -> Expression: - return f.sum( - get_field_existence_expression(aggregation), - alias="count", - ) - - -def aggregation_to_expression(aggregation: AttributeAggregation) -> Expression: - field = attribute_key_to_expression(aggregation.key) - alias = aggregation.label if aggregation.label else None - alias_dict = {"alias": alias} if alias else {} - function_map: dict[Function.ValueType, CurriedFunctionCall | FunctionCall] = { - Function.FUNCTION_SUM: f.round( - f.sum(field), - _FLOATING_POINT_PRECISION, - **alias_dict, - ), - Function.FUNCTION_AVERAGE: f.round( - f.divide( - f.sum(field), - f.sum(get_field_existence_expression(aggregation)), - ), - _FLOATING_POINT_PRECISION, - **alias_dict, - ), - Function.FUNCTION_COUNT: f.sum( - get_field_existence_expression(aggregation), - **alias_dict, - ), - Function.FUNCTION_P50: f.round( - cf.quantile(0.5)(field), _FLOATING_POINT_PRECISION, **alias_dict - ), - Function.FUNCTION_P75: f.round( - cf.quantile(0.75)(field), _FLOATING_POINT_PRECISION, **alias_dict - ), - Function.FUNCTION_P90: f.round( - cf.quantile(0.9)(field), _FLOATING_POINT_PRECISION, **alias_dict - ), - Function.FUNCTION_P95: f.round( - cf.quantile(0.95)(field), _FLOATING_POINT_PRECISION, **alias_dict - ), - Function.FUNCTION_P99: f.round( - cf.quantile(0.99)(field), _FLOATING_POINT_PRECISION, **alias_dict - ), - Function.FUNCTION_AVG: f.round( - f.avg(field), _FLOATING_POINT_PRECISION, **alias_dict - ), - Function.FUNCTION_MAX: f.max(field, **alias_dict), - Function.FUNCTION_MIN: f.min(field, **alias_dict), - Function.FUNCTION_UNIQ: f.uniq(field, **alias_dict), - } - - agg_func_expr = function_map.get(aggregation.aggregate) - - if agg_func_expr is None: - raise BadSnubaRPCRequestException( - f"Aggregation not specified for {aggregation.key.name}" - ) - - return agg_func_expr diff --git a/snuba/web/rpc/v1/resolvers/R_uptime_checks/common/common.py b/snuba/web/rpc/v1/resolvers/R_uptime_checks/common/common.py index 34f7c29b2ee..90677c1cfaf 100644 --- a/snuba/web/rpc/v1/resolvers/R_uptime_checks/common/common.py +++ b/snuba/web/rpc/v1/resolvers/R_uptime_checks/common/common.py @@ -14,17 +14,8 @@ from snuba.query import Query from snuba.query.conditions import combine_and_conditions, combine_or_conditions from snuba.query.dsl import Functions as f -from snuba.query.dsl import ( - and_cond, - column, - in_cond, - literal, - literals_array, - not_cond, - or_cond, -) +from snuba.query.dsl import and_cond, column, in_cond, literal, literals_array from snuba.query.expressions import Expression, FunctionCall, SubscriptableReference -from snuba.web.rpc.common.common import _check_non_string_values_cannot_ignore_case from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException @@ -204,143 +195,6 @@ def transform_expressions(expression: Expression) -> Expression: query.transform_expressions(transform_expressions) -def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression: - """ - Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024") - This maps those filters into an expression which can be used in a WHERE clause - :param item_filter: - :return: - """ - if item_filter.HasField("and_filter"): - filters = item_filter.and_filter.filters - if len(filters) == 0: - return literal(True) - if len(filters) == 1: - return trace_item_filters_to_expression(filters[0]) - return and_cond(*(trace_item_filters_to_expression(x) for x in filters)) - - if item_filter.HasField("or_filter"): - filters = item_filter.or_filter.filters - if len(filters) == 0: - raise BadSnubaRPCRequestException( - "Invalid trace item filter, empty 'or' clause" - ) - if len(filters) == 1: - return trace_item_filters_to_expression(filters[0]) - return or_cond(*(trace_item_filters_to_expression(x) for x in filters)) - - if item_filter.HasField("comparison_filter"): - k = item_filter.comparison_filter.key - k_expression = attribute_key_to_expression(k) - op = item_filter.comparison_filter.op - v = item_filter.comparison_filter.value - - value_type = v.WhichOneof("value") - if value_type is None: - raise BadSnubaRPCRequestException( - "comparison does not have a right hand side" - ) - - match value_type: - case "val_bool": - v_expression: Expression = literal(v.val_bool) - case "val_str": - v_expression = literal(v.val_str) - case "val_float": - v_expression = literal(v.val_float) - case "val_double": - v_expression = literal(v.val_double) - case "val_int": - v_expression = literal(v.val_int) - case "val_null": - v_expression = literal(None) - case "val_str_array": - v_expression = literals_array( - None, list(map(lambda x: literal(x), v.val_str_array.values)) - ) - case "val_int_array": - v_expression = literals_array( - None, list(map(lambda x: literal(x), v.val_int_array.values)) - ) - case "val_float_array": - v_expression = literals_array( - None, list(map(lambda x: literal(x), v.val_float_array.values)) - ) - case "val_double_array": - v_expression = literals_array( - None, list(map(lambda x: literal(x), v.val_double_array.values)) - ) - case default: - raise NotImplementedError( - f"translation of AttributeValue type {default} is not implemented" - ) - - if op == ComparisonFilter.OP_EQUALS: - _check_non_string_values_cannot_ignore_case(item_filter.comparison_filter) - return ( - f.equals(f.lower(k_expression), f.lower(v_expression)) - if item_filter.comparison_filter.ignore_case - else f.equals(k_expression, v_expression) - ) - if op == ComparisonFilter.OP_NOT_EQUALS: - _check_non_string_values_cannot_ignore_case(item_filter.comparison_filter) - return ( - f.notEquals(f.lower(k_expression), f.lower(v_expression)) - if item_filter.comparison_filter.ignore_case - else f.notEquals(k_expression, v_expression) - ) - if op == ComparisonFilter.OP_LIKE: - if k.type != AttributeKey.Type.TYPE_STRING: - raise BadSnubaRPCRequestException( - "the LIKE comparison is only supported on string keys" - ) - return f.like(k_expression, v_expression) - if op == ComparisonFilter.OP_NOT_LIKE: - if k.type != AttributeKey.Type.TYPE_STRING: - raise BadSnubaRPCRequestException( - "the NOT LIKE comparison is only supported on string keys" - ) - return f.notLike(k_expression, v_expression) - if op == ComparisonFilter.OP_LESS_THAN: - return f.less(k_expression, v_expression) - if op == ComparisonFilter.OP_LESS_THAN_OR_EQUALS: - return f.lessOrEquals(k_expression, v_expression) - if op == ComparisonFilter.OP_GREATER_THAN: - return f.greater(k_expression, v_expression) - if op == ComparisonFilter.OP_GREATER_THAN_OR_EQUALS: - return f.greaterOrEquals(k_expression, v_expression) - if op == ComparisonFilter.OP_IN: - _check_non_string_values_cannot_ignore_case(item_filter.comparison_filter) - if item_filter.comparison_filter.ignore_case: - k_expression = f.lower(k_expression) - v_expression = literals_array( - None, - list(map(lambda x: literal(x.lower()), v.val_str_array.values)), - ) - return in_cond(k_expression, v_expression) - if op == ComparisonFilter.OP_NOT_IN: - _check_non_string_values_cannot_ignore_case(item_filter.comparison_filter) - if item_filter.comparison_filter.ignore_case: - k_expression = f.lower(k_expression) - v_expression = literals_array( - None, - list(map(lambda x: literal(x.lower()), v.val_str_array.values)), - ) - return not_cond(in_cond(k_expression, v_expression)) - - raise BadSnubaRPCRequestException( - f"Invalid string comparison, unknown op: {item_filter.comparison_filter}" - ) - - print(item_filter) - - if item_filter.HasField("exists_filter"): - k = item_filter.exists_filter.key - return f.isNotNull(column(k.name)) - - return literal(True) - - def project_id_and_org_conditions(meta: RequestMeta) -> Expression: return and_cond( in_cond( diff --git a/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_time_series.py b/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_time_series.py index f71dd8aec5e..051b04355dc 100644 --- a/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_time_series.py +++ b/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_time_series.py @@ -26,19 +26,16 @@ from snuba.query.query_settings import HTTPQuerySettings from snuba.request import Request as SnubaRequest from snuba.web.query import run_query +from snuba.web.rpc.common.common import trace_item_filters_to_expression from snuba.web.rpc.common.debug_info import ( extract_response_meta, setup_trace_query_settings, ) from snuba.web.rpc.v1.resolvers import ResolverTimeSeries -from snuba.web.rpc.v1.resolvers.R_uptime_checks.common.aggregation import ( - aggregation_to_expression, - get_count_column, -) +from snuba.web.rpc.v1.resolvers.common.aggregation import aggregation_to_expression from snuba.web.rpc.v1.resolvers.R_uptime_checks.common.common import ( attribute_key_to_expression, base_conditions_and, - trace_item_filters_to_expression, treeify_or_and_conditions, ) @@ -163,18 +160,15 @@ def _build_query(request: TimeSeriesRequest) -> Query: aggregation_columns = [ SelectedExpression( - name=aggregation.label, expression=aggregation_to_expression(aggregation) + name=aggregation.label, + expression=aggregation_to_expression( + aggregation, + attribute_key_to_expression(aggregation.key), + ), ) for aggregation in request.aggregations ] - additional_context_columns = [] - for aggregation in request.aggregations: - count_column = get_count_column(aggregation) - additional_context_columns.append( - SelectedExpression(name=count_column.alias, expression=count_column) - ) - groupby_columns = [ SelectedExpression( name=attr_key.name, expression=attribute_key_to_expression(attr_key) @@ -214,11 +208,13 @@ def _build_query(request: TimeSeriesRequest) -> Query: ), *aggregation_columns, *groupby_columns, - *additional_context_columns, ], granularity=request.granularity_secs, condition=base_conditions_and( - request.meta, trace_item_filters_to_expression(request.filter) + request.meta, + trace_item_filters_to_expression( + request.filter, attribute_key_to_expression + ), ), groupby=[ column("time_slot"), diff --git a/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_trace_item_table.py index d8aa2f42d99..a2e57472381 100644 --- a/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_trace_item_table.py @@ -1,7 +1,6 @@ import uuid -from collections import defaultdict from dataclasses import replace -from typing import Any, Callable, Dict, Iterable, Sequence +from typing import Sequence from google.protobuf.json_format import MessageToDict from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( @@ -12,7 +11,6 @@ TraceItemTableResponse, ) from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue from snuba.attribution.appid import AppID from snuba.attribution.attribution_info import AttributionInfo @@ -28,20 +26,22 @@ from snuba.query.query_settings import HTTPQuerySettings from snuba.request import Request as SnubaRequest from snuba.web.query import run_query +from snuba.web.rpc.common.common import ( + add_existence_check_to_subscriptable_references, + trace_item_filters_to_expression, +) from snuba.web.rpc.common.debug_info import ( extract_response_meta, setup_trace_query_settings, ) from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable -from snuba.web.rpc.v1.resolvers.R_uptime_checks.common.aggregation import ( - aggregation_to_expression, -) +from snuba.web.rpc.v1.resolvers.common.aggregation import aggregation_to_expression +from snuba.web.rpc.v1.resolvers.common.trace_item_table import convert_results from snuba.web.rpc.v1.resolvers.R_uptime_checks.common.common import ( apply_virtual_columns, attribute_key_to_expression, base_conditions_and, - trace_item_filters_to_expression, treeify_or_and_conditions, ) @@ -66,7 +66,12 @@ def aggregation_filter_to_expression(agg_filter: AggregationFilter) -> Expressio f"Unsupported aggregation filter op: {AggregationComparisonFilter.Op.Name(agg_filter.comparison_filter.op)}" ) return op_expr( - aggregation_to_expression(agg_filter.comparison_filter.aggregation), + aggregation_to_expression( + agg_filter.comparison_filter.aggregation, + attribute_key_to_expression( + agg_filter.comparison_filter.aggregation.key + ), + ), agg_filter.comparison_filter.val, ) case "and_filter": @@ -114,7 +119,10 @@ def _convert_order_by( res.append( OrderBy( direction=direction, - expression=aggregation_to_expression(x.column.aggregation), + expression=aggregation_to_expression( + x.column.aggregation, + attribute_key_to_expression(x.column.aggregation.key), + ), ) ) return res @@ -138,7 +146,10 @@ def _build_query(request: TraceItemTableRequest) -> Query: SelectedExpression(name=column.label, expression=key_col) ) elif column.HasField("aggregation"): - function_expr = aggregation_to_expression(column.aggregation) + function_expr = aggregation_to_expression( + column.aggregation, + attribute_key_to_expression(column.aggregation.key), + ) # aggregation label may not be set and the column label takes priority anyways. function_expr = replace(function_expr, alias=column.label) selected_columns.append( @@ -157,7 +168,9 @@ def _build_query(request: TraceItemTableRequest) -> Query: selected_columns=selected_columns, condition=base_conditions_and( request.meta, - trace_item_filters_to_expression(request.filter), + trace_item_filters_to_expression( + request.filter, attribute_key_to_expression + ), ), order_by=_convert_order_by(request.order_by), groupby=[ @@ -175,6 +188,7 @@ def _build_query(request: TraceItemTableRequest) -> Query: ) treeify_or_and_conditions(res) apply_virtual_columns(res, request.virtual_column_contexts) + add_existence_check_to_subscriptable_references(res) return res @@ -202,47 +216,6 @@ def _build_snuba_request(request: TraceItemTableRequest) -> SnubaRequest: ) -def _convert_results( - request: TraceItemTableRequest, data: Iterable[Dict[str, Any]] -) -> list[TraceItemColumnValues]: - converters: Dict[str, Callable[[Any], AttributeValue]] = {} - - for column in request.columns: - if column.HasField("key"): - if column.key.type == AttributeKey.TYPE_BOOLEAN: - converters[column.label] = lambda x: AttributeValue(val_bool=bool(x)) - elif column.key.type == AttributeKey.TYPE_STRING: - converters[column.label] = lambda x: AttributeValue(val_str=str(x)) - elif column.key.type == AttributeKey.TYPE_INT: - converters[column.label] = lambda x: AttributeValue(val_int=int(x)) - elif column.key.type == AttributeKey.TYPE_FLOAT: - converters[column.label] = lambda x: AttributeValue(val_float=float(x)) - elif column.key.type == AttributeKey.TYPE_DOUBLE: - converters[column.label] = lambda x: AttributeValue(val_double=float(x)) - elif column.HasField("aggregation"): - converters[column.label] = lambda x: AttributeValue(val_double=float(x)) - else: - raise BadSnubaRPCRequestException( - "column is neither an attribute or aggregation" - ) - - res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues) - for row in data: - for column_name, value in row.items(): - if column_name in converters.keys(): - res[column_name].results.append(converters[column_name](value)) - res[column_name].attribute_name = column_name - - column_ordering = {column.label: i for i, column in enumerate(request.columns)} - - return list( - # we return the columns in the order they were requested - sorted( - res.values(), key=lambda c: column_ordering.__getitem__(c.attribute_name) - ) - ) - - def _get_page_token( request: TraceItemTableRequest, response: list[TraceItemColumnValues] ) -> PageToken: @@ -264,7 +237,7 @@ def resolve(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse: request=snuba_request, timer=self._timer, ) - column_values = _convert_results(in_msg, res.result.get("data", [])) + column_values = convert_results(in_msg, res.result.get("data", [])) response_meta = extract_response_meta( in_msg.meta.request_id, in_msg.meta.debug, diff --git a/snuba/web/rpc/v1/resolvers/common/__init__.py b/snuba/web/rpc/v1/resolvers/common/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py b/snuba/web/rpc/v1/resolvers/common/aggregation.py similarity index 80% rename from snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py rename to snuba/web/rpc/v1/resolvers/common/aggregation.py index 351cd8e020f..ec67bda617c 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py +++ b/snuba/web/rpc/v1/resolvers/common/aggregation.py @@ -17,17 +17,14 @@ from snuba.query.dsl import CurriedFunctions as cf from snuba.query.dsl import Functions as f from snuba.query.dsl import column -from snuba.query.expressions import ( - CurriedFunctionCall, - Expression, - FunctionCall, - SubscriptableReference, -) -from snuba.web.rpc.common.common import attribute_key_to_expression +from snuba.query.expressions import CurriedFunctionCall, Expression, FunctionCall +from snuba.web.rpc.common.common import get_field_existence_expression from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException +from snuba.web.rpc.v1.resolvers.R_eap_spans.common.common import ( + attribute_key_to_expression, +) sampling_weight_column = column("sampling_weight") -sign_column = column("sign") # Z value for 95% confidence interval is 1.96 which comes from the normal distribution z score. z_value = 1.96 @@ -277,41 +274,18 @@ def get_attribute_confidence_interval_alias( return None -def get_field_existence_expression(aggregation: AttributeAggregation) -> Expression: - def get_subscriptable_field(field: Expression) -> SubscriptableReference | None: - """ - Check if the field is a subscriptable reference or a function call with a subscriptable reference as the first parameter to handle the case - where the field is casting a subscriptable reference (e.g. for integers). If so, return the subscriptable reference. - """ - if isinstance(field, SubscriptableReference): - return field - if isinstance(field, FunctionCall) and len(field.parameters) > 0: - if len(field.parameters) > 0 and isinstance( - field.parameters[0], SubscriptableReference - ): - return field.parameters[0] - - return None - - field = attribute_key_to_expression(aggregation.key) - subscriptable_field = get_subscriptable_field(field) - if subscriptable_field is not None: - return f.mapContains(subscriptable_field.column, subscriptable_field.key) - - return f.isNotNull(field) - - def get_average_sample_rate_column(aggregation: AttributeAggregation) -> Expression: alias = CustomColumnInformation( custom_column_id="average_sample_rate", referenced_column=aggregation.label, metadata={}, ).to_alias() + field = attribute_key_to_expression(aggregation.key) return f.divide( - f.sumIf(sign_column, get_field_existence_expression(aggregation)), + f.countIf(field, get_field_existence_expression(field)), f.sumIf( - f.multiply(sign_column, sampling_weight_column), - get_field_existence_expression(aggregation), + sampling_weight_column, + get_field_existence_expression(field), ), alias=alias, ) @@ -326,9 +300,10 @@ def _get_count_column_alias(aggregation: AttributeAggregation) -> str: def get_count_column(aggregation: AttributeAggregation) -> Expression: - return f.sumIf( - sign_column, - get_field_existence_expression(aggregation), + field = attribute_key_to_expression(aggregation.key) + return f.countIf( + field, + get_field_existence_expression(field), alias=_get_count_column_alias(aggregation), ) @@ -382,57 +357,87 @@ def _get_possible_percentiles_expression( def get_extrapolated_function( aggregation: AttributeAggregation, + field: Expression, ) -> CurriedFunctionCall | FunctionCall | None: sampling_weight_column = column("sampling_weight") - field = attribute_key_to_expression(aggregation.key) alias = aggregation.label if aggregation.label else None alias_dict = {"alias": alias} if alias else {} function_map_sample_weighted: dict[ Function.ValueType, CurriedFunctionCall | FunctionCall ] = { - Function.FUNCTION_SUM: f.sum( - f.multiply(field, f.multiply(sign_column, sampling_weight_column)), + Function.FUNCTION_SUM: f.sumIfOrNull( + f.multiply(field, sampling_weight_column), + get_field_existence_expression(field), **alias_dict, ), Function.FUNCTION_AVERAGE: f.divide( - f.sum(f.multiply(field, f.multiply(sign_column, sampling_weight_column))), - f.sumIf( - f.multiply(sign_column, sampling_weight_column), - get_field_existence_expression(aggregation), + f.sumIfOrNull( + f.multiply(field, sampling_weight_column), + get_field_existence_expression(field), + ), + f.sumIfOrNull( + sampling_weight_column, + get_field_existence_expression(field), ), **alias_dict, ), Function.FUNCTION_AVG: f.divide( - f.sum(f.multiply(field, f.multiply(sign_column, sampling_weight_column))), - f.sumIf( - f.multiply(sign_column, sampling_weight_column), - get_field_existence_expression(aggregation), + f.sumIfOrNull( + f.multiply(field, sampling_weight_column), + get_field_existence_expression(field), + ), + f.sumIfOrNull( + sampling_weight_column, + get_field_existence_expression(field), ), **alias_dict, ), - Function.FUNCTION_COUNT: f.sumIf( - f.multiply(sign_column, sampling_weight_column), - get_field_existence_expression(aggregation), + Function.FUNCTION_COUNT: f.sumIfOrNull( + sampling_weight_column, + get_field_existence_expression(field), + **alias_dict, + ), + Function.FUNCTION_P50: cf.quantileTDigestWeightedIfOrNull(0.5)( + field, + sampling_weight_column, + get_field_existence_expression(field), + **alias_dict, + ), + Function.FUNCTION_P75: cf.quantileTDigestWeightedIfOrNull(0.75)( + field, + sampling_weight_column, + get_field_existence_expression(field), **alias_dict, ), - Function.FUNCTION_P50: cf.quantileTDigestWeighted(0.5)( - field, sampling_weight_column, **alias_dict + Function.FUNCTION_P90: cf.quantileTDigestWeightedIfOrNull(0.9)( + field, + sampling_weight_column, + get_field_existence_expression(field), + **alias_dict, ), - Function.FUNCTION_P75: cf.quantileTDigestWeighted(0.75)( - field, sampling_weight_column, **alias_dict + Function.FUNCTION_P95: cf.quantileTDigestWeightedIfOrNull(0.95)( + field, + sampling_weight_column, + get_field_existence_expression(field), + **alias_dict, ), - Function.FUNCTION_P90: cf.quantileTDigestWeighted(0.9)( - field, sampling_weight_column, **alias_dict + Function.FUNCTION_P99: cf.quantileTDigestWeightedIfOrNull(0.99)( + field, + sampling_weight_column, + get_field_existence_expression(field), + **alias_dict, ), - Function.FUNCTION_P95: cf.quantileTDigestWeighted(0.95)( - field, sampling_weight_column, **alias_dict + Function.FUNCTION_MAX: f.maxIfOrNull( + field, get_field_existence_expression(field), **alias_dict ), - Function.FUNCTION_P99: cf.quantileTDigestWeighted(0.99)( - field, sampling_weight_column, **alias_dict + Function.FUNCTION_MIN: f.minIfOrNull( + field, get_field_existence_expression(field), **alias_dict + ), + Function.FUNCTION_UNIQ: f.uniqIfOrNull( + field, + get_field_existence_expression(field), + **alias_dict, ), - Function.FUNCTION_MAX: f.max(field, **alias_dict), - Function.FUNCTION_MIN: f.min(field, **alias_dict), - Function.FUNCTION_UNIQ: f.uniq(field, **alias_dict), } return function_map_sample_weighted.get(aggregation.aggregate) @@ -471,7 +476,7 @@ def get_confidence_interval_column( f.multiply(sampling_weight_column, sampling_weight_column), sampling_weight_column, ), - get_field_existence_expression(aggregation), + get_field_existence_expression(field), ), ) ), @@ -499,17 +504,17 @@ def get_confidence_interval_column( sampling_weight_column, f.multiply(field, field), ), - get_field_existence_expression(aggregation), + get_field_existence_expression(field), ), f.divide( f.multiply( f.sumIf( f.multiply(sampling_weight_column, field), - get_field_existence_expression(aggregation), + get_field_existence_expression(field), ), f.sumIf( f.multiply(sampling_weight_column, field), - get_field_existence_expression(aggregation), + get_field_existence_expression(field), ), ), column(f"{alias}_N"), @@ -518,12 +523,10 @@ def get_confidence_interval_column( f.multiply( f.sumIf( sampling_weight_column, - get_field_existence_expression(aggregation), + get_field_existence_expression(field), alias=f"{alias}_N", ), - f.sumIf( - sign_column, get_field_existence_expression(aggregation) - ), + f.countIf(field, get_field_existence_expression(field)), ), ) ), @@ -550,17 +553,17 @@ def get_confidence_interval_column( sampling_weight_column, f.multiply(field, field), ), - get_field_existence_expression(aggregation), + get_field_existence_expression(field), ), f.divide( f.multiply( f.sumIf( f.multiply(sampling_weight_column, field), - get_field_existence_expression(aggregation), + get_field_existence_expression(field), ), f.sumIf( f.multiply(sampling_weight_column, field), - get_field_existence_expression(aggregation), + get_field_existence_expression(field), ), ), column(f"{alias}_N"), @@ -569,12 +572,10 @@ def get_confidence_interval_column( f.multiply( f.sumIf( sampling_weight_column, - get_field_existence_expression(aggregation), + get_field_existence_expression(field), alias=f"{alias}_N", ), - f.sumIf( - sign_column, get_field_existence_expression(aggregation) - ), + f.countIf(field, get_field_existence_expression(field)), ), ) ), @@ -631,59 +632,71 @@ def calculate_reliability( return relative_confidence <= confidence_interval_threshold -def aggregation_to_expression(aggregation: AttributeAggregation) -> Expression: - field = attribute_key_to_expression(aggregation.key) +def aggregation_to_expression( + aggregation: AttributeAggregation, field: Expression | None = None +) -> Expression: + field = field or attribute_key_to_expression(aggregation.key) alias = aggregation.label if aggregation.label else None alias_dict = {"alias": alias} if alias else {} function_map: dict[Function.ValueType, CurriedFunctionCall | FunctionCall] = { - Function.FUNCTION_SUM: f.round( - f.sum(f.multiply(field, sign_column)), - _FLOATING_POINT_PRECISION, - **alias_dict, + Function.FUNCTION_SUM: f.sumIfOrNull( + field, + get_field_existence_expression(field), ), - Function.FUNCTION_AVERAGE: f.round( - f.divide( - f.sum(f.multiply(field, sign_column)), - f.sumIf(sign_column, get_field_existence_expression(aggregation)), - ), - _FLOATING_POINT_PRECISION, - **alias_dict, + Function.FUNCTION_AVERAGE: f.avgIfOrNull( + field, get_field_existence_expression(field) ), - Function.FUNCTION_COUNT: f.sumIf( - sign_column, - get_field_existence_expression(aggregation), - **alias_dict, + Function.FUNCTION_COUNT: f.countIfOrNull( + field, get_field_existence_expression(field) ), - Function.FUNCTION_P50: f.round( - cf.quantile(0.5)(field), _FLOATING_POINT_PRECISION, **alias_dict + Function.FUNCTION_P50: cf.quantileIfOrNull(0.5)( + field, + get_field_existence_expression(field), ), - Function.FUNCTION_P75: f.round( - cf.quantile(0.75)(field), _FLOATING_POINT_PRECISION, **alias_dict + Function.FUNCTION_P75: cf.quantileIfOrNull(0.75)( + field, + get_field_existence_expression(field), ), - Function.FUNCTION_P90: f.round( - cf.quantile(0.9)(field), _FLOATING_POINT_PRECISION, **alias_dict + Function.FUNCTION_P90: cf.quantileIfOrNull(0.9)( + field, + get_field_existence_expression(field), ), - Function.FUNCTION_P95: f.round( - cf.quantile(0.95)(field), _FLOATING_POINT_PRECISION, **alias_dict + Function.FUNCTION_P95: cf.quantileIfOrNull(0.95)( + field, + get_field_existence_expression(field), ), - Function.FUNCTION_P99: f.round( - cf.quantile(0.99)(field), _FLOATING_POINT_PRECISION, **alias_dict + Function.FUNCTION_P99: cf.quantileIfOrNull(0.99)( + field, + get_field_existence_expression(field), ), - Function.FUNCTION_AVG: f.round( - f.avg(field), _FLOATING_POINT_PRECISION, **alias_dict + Function.FUNCTION_AVG: f.avgIfOrNull( + field, get_field_existence_expression(field) + ), + Function.FUNCTION_MAX: f.maxIfOrNull( + field, + get_field_existence_expression(field), + ), + Function.FUNCTION_MIN: f.minIfOrNull( + field, + get_field_existence_expression(field), + ), + Function.FUNCTION_UNIQ: f.uniqIfOrNull( + field, + get_field_existence_expression(field), ), - Function.FUNCTION_MAX: f.max(field, **alias_dict), - Function.FUNCTION_MIN: f.min(field, **alias_dict), - Function.FUNCTION_UNIQ: f.uniq(field, **alias_dict), } if ( aggregation.extrapolation_mode == ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED ): - agg_func_expr = get_extrapolated_function(aggregation) + agg_func_expr = get_extrapolated_function(aggregation, field) else: agg_func_expr = function_map.get(aggregation.aggregate) + if agg_func_expr is not None: + agg_func_expr = f.round( + agg_func_expr, _FLOATING_POINT_PRECISION, **alias_dict + ) if agg_func_expr is None: raise BadSnubaRPCRequestException( diff --git a/snuba/web/rpc/v1/resolvers/common/trace_item_table.py b/snuba/web/rpc/v1/resolvers/common/trace_item_table.py new file mode 100644 index 00000000000..67103cbcd78 --- /dev/null +++ b/snuba/web/rpc/v1/resolvers/common/trace_item_table.py @@ -0,0 +1,63 @@ +from collections import defaultdict +from typing import Any, Callable, Dict, Iterable + +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( + TraceItemColumnValues, + TraceItemTableRequest, +) +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue + +from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException +from snuba.web.rpc.v1.resolvers.common.aggregation import ExtrapolationContext + + +def convert_results( + request: TraceItemTableRequest, data: Iterable[Dict[str, Any]] +) -> list[TraceItemColumnValues]: + converters: Dict[str, Callable[[Any], AttributeValue]] = {} + + for column in request.columns: + if column.HasField("key"): + if column.key.type == AttributeKey.TYPE_BOOLEAN: + converters[column.label] = lambda x: AttributeValue(val_bool=bool(x)) + elif column.key.type == AttributeKey.TYPE_STRING: + converters[column.label] = lambda x: AttributeValue(val_str=str(x)) + elif column.key.type == AttributeKey.TYPE_INT: + converters[column.label] = lambda x: AttributeValue(val_int=int(x)) + elif column.key.type == AttributeKey.TYPE_FLOAT: + converters[column.label] = lambda x: AttributeValue(val_float=float(x)) + elif column.key.type == AttributeKey.TYPE_DOUBLE: + converters[column.label] = lambda x: AttributeValue(val_double=float(x)) + elif column.HasField("aggregation"): + converters[column.label] = lambda x: AttributeValue(val_double=float(x)) + elif column.HasField("formula"): + converters[column.label] = lambda x: AttributeValue(val_double=float(x)) + else: + raise BadSnubaRPCRequestException( + "column is not one of: attribute, aggregation, or formula" + ) + + res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues) + for row in data: + for column_name, value in row.items(): + if column_name in converters.keys(): + extrapolation_context = ExtrapolationContext.from_row(column_name, row) + res[column_name].attribute_name = column_name + if value is None: + + res[column_name].results.append(AttributeValue(is_null=True)) + else: + res[column_name].results.append(converters[column_name](value)) + if extrapolation_context.is_extrapolated: + res[column_name].reliabilities.append( + extrapolation_context.reliability + ) + + column_ordering = {column.label: i for i, column in enumerate(request.columns)} + + return list( + # we return the columns in the order they were requested + sorted( + res.values(), key=lambda c: column_ordering.__getitem__(c.attribute_name) + ) + ) diff --git a/tests/web/rpc/test_aggregation.py b/tests/web/rpc/test_aggregation.py index fd4efa4f386..41ca8bc344c 100644 --- a/tests/web/rpc/test_aggregation.py +++ b/tests/web/rpc/test_aggregation.py @@ -9,7 +9,7 @@ Reliability, ) -from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import ( +from snuba.web.rpc.v1.resolvers.common.aggregation import ( CUSTOM_COLUMN_PREFIX, CustomColumnInformation, ExtrapolationContext, diff --git a/tests/web/rpc/test_common.py b/tests/web/rpc/test_common.py index 5bcc083fb28..8b37f87a202 100644 --- a/tests/web/rpc/test_common.py +++ b/tests/web/rpc/test_common.py @@ -3,7 +3,9 @@ from snuba.query.dsl import Functions as f from snuba.query.dsl import column, literal from snuba.query.expressions import SubscriptableReference -from snuba.web.rpc.common.common import attribute_key_to_expression +from snuba.web.rpc.v1.resolvers.R_eap_spans.common.common import ( + attribute_key_to_expression, +) class TestCommon: @@ -93,7 +95,7 @@ def test_attributes(self) -> None: column=column("attr_num"), key=literal("derp"), ), - "Int64", + "Nullable(Int64)", alias="derp_TYPE_INT", ) @@ -105,6 +107,6 @@ def test_attributes(self) -> None: column=column("attr_num"), key=literal("derp"), ), - "Boolean", + "Nullable(Boolean)", alias="derp_TYPE_BOOLEAN", ) diff --git a/tests/web/rpc/v1/test_R_ourlogs/test_logs_expression_converters.py b/tests/web/rpc/v1/test_R_ourlogs/test_logs_expression_converters.py index 2f5ad2f477b..03664dabb23 100644 --- a/tests/web/rpc/v1/test_R_ourlogs/test_logs_expression_converters.py +++ b/tests/web/rpc/v1/test_R_ourlogs/test_logs_expression_converters.py @@ -13,12 +13,10 @@ from snuba.query.dsl import Functions as f from snuba.query.dsl import column, literal from snuba.query.expressions import FunctionCall +from snuba.web.rpc.common.common import trace_item_filters_to_expression from snuba.web.rpc.v1.resolvers.R_ourlogs.common.attribute_key_to_expression import ( attribute_key_to_expression, ) -from snuba.web.rpc.v1.resolvers.R_ourlogs.common.trace_item_filters_to_expression import ( - trace_item_filters_to_expression, -) class TestOurlogsExpressionConverters: @@ -111,7 +109,8 @@ def test_trace_item_filters_to_expression(self) -> None: ), ] ) - ) + ), + attribute_key_to_expression, ) == FunctionCall( None, "and", diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py index a6cfcd34bbc..4defd9a84cc 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py @@ -2345,14 +2345,14 @@ def test_sparse_aggregate(self, setup_teardown: Any) -> None: results=[ AttributeValue(val_double=20), AttributeValue(val_double=10), - AttributeValue(val_double=0), + AttributeValue(is_null=True), ], ), TraceItemColumnValues( attribute_name="sum(bark.db)", results=[ - AttributeValue(val_double=0), - AttributeValue(val_double=0), + AttributeValue(is_null=True), + AttributeValue(is_null=True), AttributeValue(val_double=200), ], ), @@ -2555,6 +2555,45 @@ def test_not_filter(setup_teardown: Any) -> None: ), ] + def test_nonexistent_attribute(setup_teardown: Any) -> None: + span_ts = BASE_TIME - timedelta(minutes=1) + write_eap_span(span_ts, {"animal_type": "duck"}, 10) + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, + ), + columns=[ + Column( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, name="nonexistent_string" + ) + ), + Column( + key=AttributeKey(type=AttributeKey.TYPE_INT, name="nonexistent_int") + ), + ], + limit=50, + ) + response = EndpointTraceItemTable().execute(message) + assert response.column_values == [ + TraceItemColumnValues( + attribute_name="nonexistent_string", + results=[AttributeValue(is_null=True) for _ in range(10)], + ), + TraceItemColumnValues( + attribute_name="nonexistent_int", + results=[AttributeValue(is_null=True) for _ in range(10)], + ), + ] + class TestUtils: def test_apply_labels_to_columns_backward_compat(self) -> None: diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_uptime_checks.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_uptime_checks.py index 97621942e44..1954cccd1b2 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_uptime_checks.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_uptime_checks.py @@ -32,6 +32,7 @@ def gen_message( dt: datetime, + overrides: Mapping[str, Any] = {}, ) -> Mapping[str, Any]: return { "organization_id": 1, @@ -55,6 +56,7 @@ def gen_message( "request_type": "GET", "http_status_code": 200, }, + **overrides, } @@ -253,3 +255,49 @@ def test_with_offset(self, setup_teardown: Any) -> None: meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), ) assert MessageToDict(response) == MessageToDict(expected_response) + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +def test_nonexistent_attribute() -> None: + _UPTIME_CHECKS = [ + gen_message( + BASE_TIME - timedelta(minutes=30), + { + "http_status_code": None, + "request_info": {"request_type": "GET", "http_status_code": None}, + }, + ) + for i in range(50) + ] + write_raw_unprocessed_events(get_storage(StorageKey("uptime_monitor_checks")), _UPTIME_CHECKS) # type: ignore + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = Timestamp(seconds=int((BASE_TIME - timedelta(hours=1)).timestamp())) + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=hour_ago, + end_timestamp=ts, + request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_UPTIME_CHECK, + ), + columns=[ + Column( + key=AttributeKey( + type=AttributeKey.TYPE_INT, + name="http_status_code", + ), + ), + ], + limit=50, + ) + response = EndpointTraceItemTable().execute(message) + assert response.column_values == [ + TraceItemColumnValues( + attribute_name="http_status_code", + results=[AttributeValue(is_null=True) for _ in range(50)], + ), + ]