Skip to content

Commit

Permalink
Update test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Seefooo committed Sep 27, 2022
1 parent 82979a5 commit 1ce0712
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 382 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ classifiers =
Programming Language :: Python :: 3.8

[options]
python_requires = >=3.5
python_requires = >=3.8
package_dir=
=src
packages=find_namespace:
install_requires =
snappy >= 2.8
#protobuf >= 3.13.0
protobuf == 3.20.0
requests == 2.25.0
opentelemetry-api == 1.12.0rc2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,15 @@ def __init__(
timeout: int = 30,
tls_config: Dict = None,
proxies: Dict = None,
resources_as_labels : bool = True,
):
self.endpoint = endpoint
self.basic_auth = basic_auth
self.headers = headers
self.timeout = timeout
self.tls_config = tls_config
self.proxies = proxies

self.converter_map = {
Sum: self._convert_from_sum,
Histogram: self._convert_from_histogram,
Gauge: self._convert_from_gauge,
}
self.resources_as_labels = resources_as_labels


@property
Expand Down Expand Up @@ -166,71 +162,73 @@ def headers(self, headers: Dict):
self._headers = headers

def export(
self, export_records
self,metrics_data : MetricsData
) ->MetricExportResult:
if not export_records:
return MetricsExportResult.SUCCESS
timeseries = self._convert_to_timeseries(export_records)
if not metrics_data:
return MetricExportResult.SUCCESS
timeseries = self._translate_data(metrics_data)
if not timeseries:
logger.error(
"All records contain unsupported aggregators, export aborted"
)
return MetricsExportResult.FAILURE
return MetricExportResult.FAILURE
message = self._build_message(timeseries)
headers = self._build_headers()
return self._send_message(message, headers)

def shutdown(self) -> None:
pass

def _translate_data(self, data: MetricsData):
def _translate_data(self, data: MetricsData) -> Sequence[TimeSeries]:
rw_timeseries = []

for resource_metrics in data.resource_metrics:
resource = resource_metrics.resource
# OTLP Data model suggests combining some attrs into job/instance
# Should we do that here?
resource_labels = self._get_resource_labels(resource.attributes)
if self.resources_as_labels:
resource_labels = [ (n,str(v)) for n,v in resource.attributes.items() ]
else:
resource_labels = []
# Scope name/version probably not too useful from a labeling perspective
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
rw_timeseries.extend( self._parse_metric(metric,resource_labels) )

def _get_resource_labels(self,attrs):
""" Converts Resource Attributes to Prometheus Labels based on
OTLP Metric Data Model's recommendations on Resource Attributes
"""
return [ (n,str(v)) for n,v in resource.attributes.items() ]
return rw_timeseries

def _parse_metric(self, metric: Metric, resource_labels: Sequence) -> Sequence[TimeSeries]:
"""
Parses the Metric & lower objects, then converts the output into
OM TimeSeries. Returns a List of TimeSeries objects based on one Metric
"""


# Create the metric name, will be a label later
if metric.unit:
#Prom. naming guidelines add unit to the name
name =f"{metric.name}_{metric.unit}"
else:
name = metric.name

# datapoints have attributes associated with them. these would be sent
# to RW as different metrics: name & labels is a unique time series
sample_sets = defaultdict(list)
if isinstance(metric.data,(Gauge,Sum)):
for dp in metric.data.data_points:
attrs,sample = self._parse_data_point(dp)
attrs,sample = self._parse_data_point(dp,name)
sample_sets[attrs].append(sample)
elif isinstance(metric.data,(HistogramType)):
raise NotImplementedError("Coming sooN!")
elif isinstance(metric.data,Histogram):
for dp in metric.data.data_points:
dp_result = self._parse_histogram_data_point(dp,name)
for attrs,sample in dp_result:
sample_sets[attrs].append(sample)
else:
logger.warn("Unsupported Metric Type: %s",type(metric.data))
return []

# Create the metric name, will be a label later
if metric.unit:
#Prom. naming guidelines add unit to the name
name =f"{metric.name}_{metric.unit}"
else:
name = metric.name

timeseries = []
for labels, samples in sample_sets.items():
ts = TimeSeries()
ts.labels.append(self._label("__name__",name))
for label_name,label_value in chain(resource_labels,labels):
# Previous implementation did not str() the names...
ts.labels.append(self._label(label_name,str(label_value)))
Expand All @@ -239,23 +237,61 @@ def _parse_metric(self, metric: Metric, resource_labels: Sequence) -> Sequence[T
timeseries.append(ts)
return timeseries

def _sample(self,value,timestamp :int):
def _sample(self,value: int,timestamp :int) -> Sample:
sample = Sample()
sample.value = value
sample.timestamp = timestamp
return sample

def _label(self,name:str,value:str):
def _label(self,name:str,value:str) -> Label:
label = Label()
label.name = name
label.value = value
return label

def _parse_data_point(self, data_point):
def _parse_histogram_data_point(self, data_point, name):

#if (len(data_point.explicit_bounds)+1) != len(data_point.bucket_counts):
# raise ValueError("Number of buckets must be 1 more than the explicit bounds!")

sample_attr_pairs = []

base_attrs = [(n,v) for n,v in data_point.attributes.items()]
timestamp = data_point.time_unix_nano // 1_000_000


def handle_bucket(value,bound=None,name_override=None):
# Metric Level attributes + the bucket boundry attribute + name
ts_attrs = base_attrs.copy()
ts_attrs.append(("__name__",name_override or name))
if bound:
ts_attrs.append(("le",str(bound)))
# Value is count of values in each bucket
ts_sample = (value,timestamp)
return tuple(ts_attrs), ts_sample

attrs = tuple(data_point.attributes.items())
#TODO: Optimize? create Sample here
# remote write time is in milliseconds
for bound_pos,bound in enumerate(data_point.explicit_bounds):
sample_attr_pairs.append(
handle_bucket(data_point.bucket_counts[bound_pos],bound)
)

# Add the last label for implicit +inf bucket
sample_attr_pairs.append(
handle_bucket(data_point.bucket_counts[-1],bound="+Inf")
)

#Lastly, add series for count & sum
sample_attr_pairs.append(
handle_bucket(data_point.sum,name_override=f"{name}_sum")
)
sample_attr_pairs.append(
handle_bucket(data_point.count,name_override=f"{name}_count")
)
return sample_attr_pairs

def _parse_data_point(self, data_point,name=None):

attrs = tuple(data_point.attributes.items()) + (("__name__",name),)
sample = (data_point.value,(data_point.time_unix_nano // 1_000_000))
return attrs,sample

Expand All @@ -275,27 +311,17 @@ def _convert_to_timeseries(
)
return timeseries

def _convert_from_sum(
self, sum_record
) -> Sequence[TimeSeries]:
return [
self._create_timeseries(
sum_record,
sum_record.instrument.name + "_sum",
sum_record.aggregator.checkpoint,
)
]

def _convert_from_gauge(self, gauge_record):
raise NotImplementedError("Do this")

def _convert_from_histogram(
self, histogram_record
self, histogram: Histogram,
) -> Sequence[TimeSeries]:
timeseries = []
for bound in histogram_record.aggregator.checkpoint.keys():
sample_sets = defaultdict(list)

base_attrs = [self._label(n,v) for n,v in histogram.attributes]
for bound in histogram.explicit_bounds:
bound_str = "+Inf" if bound == float("inf") else str(bound)
value = histogram_record.aggregator.checkpoint[bound]
# General attributes apply
ts_attrs = base_attrs.copy.append(self._label("le",str(bound)))
sample_sets[attrs].append(sample)
timeseries.append(
self._create_timeseries(
histogram_record,
Expand Down Expand Up @@ -411,5 +437,5 @@ def _send_message(
response.raise_for_status()
except requests.exceptions.RequestException as err:
logger.error("Export POST request failed with reason: %s", err)
return MetricsExportResult.FAILURE
return MetricsExportResult.SUCCESS
return MetricExportResult.FAILURE
return MetricExportResult.SUCCESS
Original file line number Diff line number Diff line change
Expand Up @@ -26,41 +26,39 @@ def prom_rw():
return PrometheusRemoteWriteMetricsExporter("http://victoria:8428/api/v1/write")



@pytest.fixture
def generate_metrics_data(data):
pass


def metric(request):
if hasattr(request,"param"):
type_ = request.param
else:
type_ = random.choice(["gauge","sum"])
if type_ == "gauge":
return metric_util._generate_gauge("test_gauge",random.randint(0,100))
elif type_ == "sum":
return metric_util._generate_sum("test_sum",random.randint(0,9_999_999_999))
elif type_ == "histogram":
return _generate_histogram("test_histogram")

@pytest.fixture
def metric_histogram():
def _generate_histogram(name):
dp = HistogramDataPoint(
attributes={"foo": "bar", "baz": 42},
start_time_unix_nano=1641946016139533244,
time_unix_nano=1641946016139533244,
count=random.randint(1,10),
sum=random.randint(42,420),
count=5,
sum=420,
bucket_counts=[1, 4],
explicit_bounds=[10.0, 20.0],
explicit_bounds=[10.0],
min=8,
max=18,
max=80,
)
data = Histogram(
[dp],
AggregationTemporality.CUMULATIVE,
)
return Metric(
"test_histogram",
name,
"foo",
"tu",
data=data,
)

@pytest.fixture
def metric(request):
if request.param == "gauge":
return metric_util._generate_gauge("test_gauge",random.randint(0,100))
elif request.param == "sum":
return metric_util._generate_sum("test_sum",random.randint(0,9_999_999_999))

Loading

0 comments on commit 1ce0712

Please sign in to comment.