Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(eap): handle null attribute values #6837

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@
"python.testing.pytestArgs": ["tests"],
"python.analysis.autoImportCompletions": true,
"mypy-type-checker.args": ["--strict"]

}
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ python-rapidjson==1.8
redis==4.5.4
sentry-arroyo==2.19.12
sentry-kafka-schemas==0.1.129
sentry-protos==0.1.55
sentry-protos==0.1.58
sentry-redis-tools==0.3.0
sentry-relay==0.9.5
sentry-sdk==2.18.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,6 @@ query_processors:
time_parse_columns:
- start_timestamp
- end_timestamp
- processor: OptionalAttributeAggregationTransformer
args:
attribute_column_names:
- attr_num
aggregation_names:
- sum
- count
- avg
- avgWeighted
- max
- min
- uniq
curried_aggregation_names:
- quantile
- quantileTDigestWeighted
- processor: HashBucketFunctionTransformer
args:
hash_bucket_names:
Expand Down
67 changes: 34 additions & 33 deletions snuba/web/rpc/common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,39 +297,40 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression
"comparison does not have a right hand side"
)

match value_type:
case "val_bool":
v_expression: Expression = literal(v.val_bool)
case "val_str":
v_expression = literal(v.val_str)
case "val_float":
v_expression = literal(v.val_float)
case "val_double":
v_expression = literal(v.val_double)
case "val_int":
v_expression = literal(v.val_int)
case "val_null":
v_expression = literal(None)
case "val_str_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_str_array.values))
)
case "val_int_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_int_array.values))
)
case "val_float_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_float_array.values))
)
case "val_double_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_double_array.values))
)
case default:
raise NotImplementedError(
f"translation of AttributeValue type {default} is not implemented"
)
if v.is_null:
v_expression: Expression = literal(None)
else:
match value_type:
case "val_bool":
v_expression = literal(v.val_bool)
case "val_str":
v_expression = literal(v.val_str)
case "val_float":
v_expression = literal(v.val_float)
case "val_double":
v_expression = literal(v.val_double)
case "val_int":
v_expression = literal(v.val_int)
case "val_str_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_str_array.values))
)
case "val_int_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_int_array.values))
)
case "val_float_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_float_array.values))
)
case "val_double_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_double_array.values))
)
case default:
raise NotImplementedError(
f"translation of AttributeValue type {default} is not implemented"
)

if op == ComparisonFilter.OP_EQUALS:
_check_non_string_values_cannot_ignore_case(item_filter.comparison_filter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
setup_trace_query_settings,
)
from snuba.web.rpc.v1.resolvers import ResolverTimeSeries
from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import (
from snuba.web.rpc.v1.resolvers.common.aggregation import (
ExtrapolationContext,
aggregation_to_expression,
get_average_sample_rate_column,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import uuid
from collections import defaultdict
from dataclasses import replace
from typing import Any, Callable, Dict, Iterable, Sequence
from typing import Sequence

from google.protobuf.json_format import MessageToDict
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Expand All @@ -12,11 +11,7 @@
TraceItemTableResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
AttributeKey,
AttributeValue,
ExtrapolationMode,
)
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ExtrapolationMode

from snuba.attribution.appid import AppID
from snuba.attribution.attribution_info import AttributionInfo
Expand Down Expand Up @@ -45,13 +40,13 @@
)
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable
from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import (
ExtrapolationContext,
from snuba.web.rpc.v1.resolvers.common.aggregation import (
aggregation_to_expression,
get_average_sample_rate_column,
get_confidence_interval_column,
get_count_column,
)
from snuba.web.rpc.v1.resolvers.common.trace_item_table import convert_results

_DEFAULT_ROW_LIMIT = 10_000

Expand Down Expand Up @@ -236,52 +231,6 @@ def _build_snuba_request(request: TraceItemTableRequest) -> SnubaRequest:
)


def _convert_results(
request: TraceItemTableRequest, data: Iterable[Dict[str, Any]]
) -> list[TraceItemColumnValues]:
converters: Dict[str, Callable[[Any], AttributeValue]] = {}

for column in request.columns:
if column.HasField("key"):
if column.key.type == AttributeKey.TYPE_BOOLEAN:
converters[column.label] = lambda x: AttributeValue(val_bool=bool(x))
elif column.key.type == AttributeKey.TYPE_STRING:
converters[column.label] = lambda x: AttributeValue(val_str=str(x))
elif column.key.type == AttributeKey.TYPE_INT:
converters[column.label] = lambda x: AttributeValue(val_int=int(x))
elif column.key.type == AttributeKey.TYPE_FLOAT:
converters[column.label] = lambda x: AttributeValue(val_float=float(x))
elif column.key.type == AttributeKey.TYPE_DOUBLE:
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
elif column.HasField("aggregation"):
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
else:
raise BadSnubaRPCRequestException(
"column is neither an attribute or aggregation"
)

res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues)
for row in data:
for column_name, value in row.items():
if column_name in converters.keys():
res[column_name].results.append(converters[column_name](value))
res[column_name].attribute_name = column_name
extrapolation_context = ExtrapolationContext.from_row(column_name, row)
if extrapolation_context.is_extrapolated:
res[column_name].reliabilities.append(
extrapolation_context.reliability
)

column_ordering = {column.label: i for i, column in enumerate(request.columns)}

return list(
# we return the columns in the order they were requested
sorted(
res.values(), key=lambda c: column_ordering.__getitem__(c.attribute_name)
)
)


def _get_page_token(
request: TraceItemTableRequest, response: list[TraceItemColumnValues]
) -> PageToken:
Expand All @@ -303,7 +252,7 @@ def resolve(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse:
request=snuba_request,
timer=self._timer,
)
column_values = _convert_results(in_msg, res.result.get("data", []))
column_values = convert_results(in_msg, res.result.get("data", []))
response_meta = extract_response_meta(
in_msg.meta.request_id,
in_msg.meta.debug,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,33 +59,34 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression
"comparison does not have a right hand side"
)

match value_type:
case "val_bool":
v_expression: Expression = literal(v.val_bool)
case "val_str":
v_expression = literal(v.val_str)
case "val_float":
v_expression = literal(v.val_float)
case "val_int":
v_expression = literal(v.val_int)
case "val_null":
v_expression = literal(None)
case "val_str_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_str_array.values))
)
case "val_int_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_int_array.values))
)
case "val_float_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_float_array.values))
)
case default:
raise NotImplementedError(
f"translation of AttributeValue type {default} is not implemented"
)
if v.is_null:
v_expression: Expression = literal(None)
else:
match value_type:
case "val_bool":
v_expression = literal(v.val_bool)
case "val_str":
v_expression = literal(v.val_str)
case "val_float":
v_expression = literal(v.val_float)
case "val_int":
v_expression = literal(v.val_int)
case "val_str_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_str_array.values))
)
case "val_int_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_int_array.values))
)
case "val_float_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_float_array.values))
)
case default:
raise NotImplementedError(
f"translation of AttributeValue type {default} is not implemented"
)

if op == ComparisonFilter.OP_EQUALS:
_check_non_string_values_cannot_ignore_case(item_filter.comparison_filter)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import uuid
from collections import defaultdict
from typing import Any, Callable, Dict, Iterable, Sequence
from typing import Sequence

from google.protobuf.json_format import MessageToDict
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Expand All @@ -9,7 +8,6 @@
TraceItemTableResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue

from snuba.attribution.appid import AppID
from snuba.attribution.attribution_info import AttributionInfo
Expand All @@ -29,6 +27,7 @@
)
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable
from snuba.web.rpc.v1.resolvers.common.trace_item_table import convert_results
from snuba.web.rpc.v1.resolvers.R_ourlogs.common.attribute_key_to_expression import (
attribute_key_to_expression,
)
Expand Down Expand Up @@ -124,44 +123,6 @@ def _build_snuba_request(request: TraceItemTableRequest) -> SnubaRequest:
)


def _convert_results(
request: TraceItemTableRequest, data: Iterable[Dict[str, Any]]
) -> list[TraceItemColumnValues]:
converters: Dict[str, Callable[[Any], AttributeValue]] = {}

for column in request.columns:
if column.HasField("key"):
if column.key.type == AttributeKey.TYPE_BOOLEAN:
converters[column.label] = lambda x: AttributeValue(val_bool=bool(x))
elif column.key.type == AttributeKey.TYPE_STRING:
converters[column.label] = lambda x: AttributeValue(val_str=str(x))
elif column.key.type == AttributeKey.TYPE_INT:
converters[column.label] = lambda x: AttributeValue(val_int=int(x))
elif (
column.key.type == AttributeKey.TYPE_FLOAT
or column.key.type == AttributeKey.Type.TYPE_DOUBLE
):
converters[column.label] = lambda x: AttributeValue(val_float=float(x))
else:
raise BadSnubaRPCRequestException("column is not an attribute")

res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues)
for row in data:
for column_name, value in row.items():
if column_name in converters.keys():
res[column_name].results.append(converters[column_name](value))
res[column_name].attribute_name = column_name

column_ordering = {column.label: i for i, column in enumerate(request.columns)}

return list(
# we return the columns in the order they were requested
sorted(
res.values(), key=lambda c: column_ordering.__getitem__(c.attribute_name)
)
)


def _get_page_token(
request: TraceItemTableRequest, response: list[TraceItemColumnValues]
) -> PageToken:
Expand All @@ -183,7 +144,7 @@ def resolve(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse:
request=snuba_request,
timer=self._timer,
)
column_values = _convert_results(in_msg, res.result.get("data", []))
column_values = convert_results(in_msg, res.result.get("data", []))
response_meta = extract_response_meta(
in_msg.meta.request_id,
in_msg.meta.debug,
Expand Down
Loading