Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cnnradams committed Aug 6, 2020
1 parent bf8b545 commit 97778a0
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 85 deletions.
4 changes: 3 additions & 1 deletion docs/examples/exemplars/README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
OpenTelemetry Exemplars Example
===============================

.. _Exemplars:

Exemplars are example measurements for aggregations. While they are simple conceptually, exemplars can estimate any statistic about the input distribution, can provide links to sample traces for high latency requests, and much more.
For more information about exemplars and how they work in OpenTelemetry, see the `spec <https://github.com/open-telemetry/oteps/pull/113>`_

Expand All @@ -24,7 +26,7 @@ Statistical exemplars
The opentelemetry SDK provides a way to sample exemplars statistically:

- Exemplars will be picked to represent the input distribution, without unquantifiable bias
- A "sample_count" attribute will be set on each exemplar to quantify how many measurements each exemplar represents
- A "sample_count" attribute will be set on each exemplar to quantify how many measurements each exemplar represents (for randomly sampled exemplars, this value will be N (total measurements) / num_samples. For histogram exemplars, this value will be specific to each bucket).

See 'statistical_exemplars.ipynb' for the example (TODO: how do I link this?)

Expand Down
4 changes: 2 additions & 2 deletions docs/examples/exemplars/statistical_exemplars.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
" random.seed(1)\n",
"\n",
" # customer 123 is a big user, and made 1000 requests in this timeframe\n",
" requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100\n",
" requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, standard deviation 250\n",
"\n",
" for request in requests:\n",
" bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": \"REST\", \"customer_id\": 123})\n",
Expand Down Expand Up @@ -205,7 +205,7 @@
" customer_bytes_map[exemplar.dropped_labels] += exemplar.value\n",
"\n",
"\n",
"customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True)\n",
"customer_bytes_list = sorted(customer_bytes_map.items(), key=lambda t: t[1], reverse=True)\n",
"\n",
"# Save our top 5 customers and sum all of the rest into \"Others\".\n",
"top_3_customers = [(\"Customer {}\".format(dict(val[0])[\"customer_id\"]), val[1]) for val in customer_bytes_list[:3]] + [(\"Other Customers\", sum([val[1] for val in customer_bytes_list[3:]]))]\n",
Expand Down
18 changes: 7 additions & 11 deletions docs/examples/exemplars/statistical_exemplars.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ def unknown_customer_calls():

# customer 123 is a big user, and made 1000 requests in this timeframe
requests = np.random.normal(
1000, 250, 1000
) # 1000 requests with average 1000 bytes, covariance 100
1000, 100, 1000
) # 1000 requests with average 1000 bytes, standard deviation 100

for request in requests:
bytes_counter.add(
Expand Down Expand Up @@ -123,7 +123,7 @@ def unknown_customer_calls():


customer_bytes_list = sorted(
list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True
customer_bytes_map.items(), key=lambda t: t[1], reverse=True
)

# Save our top 5 customers and sum all of the rest into "Others".
Expand All @@ -146,7 +146,6 @@ def unknown_customer_calls():

# Since the exemplars were randomly sampled, all sample_counts will be the same
sample_count = exemplars[0].sample_count
print("sample count", sample_count, "custmer", customer_123_bytes)
full_customer_123_bytes = sample_count * customer_123_bytes

# With seed == 1 we get 1008612 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate)
Expand All @@ -160,13 +159,10 @@ def unknown_customer_calls():
top_25_customers = customer_bytes_list[:25]

# out of those 25 customers, determine how many used grpc, and come up with a ratio
percent_grpc = len(
list(
filter(
lambda customer_value: customer_value[0][1][1] == "gRPC",
top_25_customers,
)
)
percent_grpc = sum(
1
for customer_value in top_25_customers
if customer_value[0][1][1] == "gRPC"
) / len(top_25_customers)

print(
Expand Down
121 changes: 62 additions & 59 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
1. A "trace" exemplar sampler, which only samples exemplars if they have a sampled trace context (and can pick exemplars with other biases, ie min + max).
2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars)
To use an exemplar recorder, pass in two arguments to the aggregator config in views (see the "Exemplars" example for an example):
To use an exemplar recorder, pass in two arguments to the aggregator config in views (see the :ref:`Exemplars` example for an example):
"num_exemplars": The number of exemplars to record (if applicable, in each bucket). Note that in non-statistical mode the recorder may not use "num_exemplars"
"statistical_exemplars": If exemplars should be recorded statistically
Expand All @@ -29,6 +29,7 @@
import abc
import itertools
import random
from typing import List, Optional, Tuple, Union

from opentelemetry.context import get_current
from opentelemetry.util import time_ns
Expand All @@ -41,12 +42,12 @@ class Exemplar:

def __init__(
self,
value,
timestamp,
dropped_labels=None,
span_id=None,
trace_id=None,
sample_count=None,
value: Union[int, float],
timestamp: int,
dropped_labels: Optional[Tuple[Tuple[str, str]]] = None,
span_id: Optional[bytes] = None,
trace_id: Optional[bytes] = None,
sample_count: Optional[float] = None,
):
self._value = value
self._timestamp = timestamp
Expand Down Expand Up @@ -94,22 +95,22 @@ def sample_count(self):
"""For statistical exemplars, how many measurements a single exemplar represents"""
return self._sample_count

def set_sample_count(self, count):
def set_sample_count(self, count: float):
self._sample_count = count


class ExemplarSampler:
class ExemplarSampler(abc.ABC):
"""
Abstract class to sample exemplars through a stream of incoming measurements
Abstract class to sample `k` exemplars in some way through a stream of incoming measurements
"""

def __init__(self, k, statistical=False):
def __init__(self, k: int, statistical: bool = False):
self._k = k
self._statistical = statistical
self._sample_set = list()
self._sample_set = []

@abc.abstractmethod
def sample(self, exemplar, **kwargs):
def sample(self, exemplar: Exemplar, **kwargs):
"""
Given an exemplar, determine if it should be sampled or not
"""
Expand All @@ -122,7 +123,7 @@ def sample_set(self):
"""

@abc.abstractmethod
def merge(self, set1, set2):
def merge(self, set1: List[Exemplar], set2: List[Exemplar]):
"""
Given two lists of sampled exemplars, merge them while maintaining the invariants specified by this sampler
"""
Expand All @@ -139,33 +140,35 @@ class RandomExemplarSampler(ExemplarSampler):
Randomly sample a set of k exemplars from a stream. Each measurement in the stream
will have an equal chance of being sampled.
If RandomExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records.
If `RandomExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records.
This value will be equal to the number of measurements recorded per every exemplar measured - all exemplars will have the same sample_count value.
"""

def __init__(self, k, statistical=False):
def __init__(self, k: int, statistical: bool = False):
super().__init__(k, statistical=statistical)
self.rand_count = 0

def sample(self, exemplar, **kwargs):
def sample(self, exemplar: Exemplar, **kwargs):
self.rand_count += 1

if len(self.sample_set) < self._k:
self.sample_set.append(exemplar)
if len(self._sample_set) < self._k:
self._sample_set.append(exemplar)
return

# We sample a random subset of a stream using "Algorithm R":
# https://en.wikipedia.org/wiki/Reservoir_sampling#Simple_algorithm
replace_index = random.randint(0, self.rand_count - 1)

if replace_index < self._k:
self.sample_set[replace_index] = exemplar
self._sample_set[replace_index] = exemplar

def merge(self, set1, set2):
combined = set1 + set2
if len(combined) <= self._k:
return combined
return random.sample(combined, self._k)
def merge(self, set1: List[Exemplar], set2: List[Exemplar]):
"""
Assume that set2 is the latest set of exemplars.
For simplicity, we will just keep set2 and assume set1 has already been exported.
This may need to change with a different SDK implementation.
"""
return set2

@property
def sample_set(self):
Expand All @@ -186,12 +189,12 @@ class MinMaxExemplarSampler(ExemplarSampler):
Sample the minimum and maximum measurements recorded only
"""

def __init__(self, k, statistical=False):
def __init__(self, k: int, statistical: bool = False):
# K will always be 2 (min and max), and selecting min and max can never be statistical
super().__init__(2, statistical=False)
self._sample_set = []

def sample(self, exemplar, **kwargs):
def sample(self, exemplar: Exemplar, **kwargs):
self._sample_set = [
min(
self._sample_set + [exemplar],
Expand All @@ -209,12 +212,13 @@ def sample(self, exemplar, **kwargs):
def sample_set(self):
return self._sample_set

def merge(self, set1, set2):
merged_set = set1 + set2
if len(merged_set) <= 2:
return sorted(merged_set, key=lambda exemplar: exemplar.value)

return [min(merged_set), max(merged_set)]
def merge(self, set1: List[Exemplar], set2: List[Exemplar]):
"""
Assume that set2 is the latest set of exemplars.
For simplicity, we will just keep set2 and assume set1 has already been exported.
This may need to change with a different SDK implementation.
"""
return set2

def reset(self):
self._sample_set = []
Expand All @@ -228,15 +232,17 @@ class BucketedExemplarSampler(ExemplarSampler):
This value will be equal to `len(bucket.exemplars) / bucket.count`, that is the number of measurements each exemplar represents.
"""

def __init__(self, k, statistical=False, boundaries=None):
def __init__(
self, k: int, statistical: bool = False, boundaries: list = None
):
super().__init__(k)
self._boundaries = boundaries
self._sample_set = [
RandomExemplarSampler(k, statistical=statistical)
for _ in range(len(self._boundaries) + 1)
]

def sample(self, exemplar, **kwargs):
def sample(self, exemplar: Exemplar, **kwargs):
bucket_index = kwargs.get("bucket_index")
if bucket_index is None:
return
Expand All @@ -251,25 +257,13 @@ def sample_set(self):
)
)

def merge(self, set1, set2):
exemplar_set = [list() for _ in range(len(self._boundaries) + 1)]
# Sort both sets back into buckets
for setx in [set1, set2]:
bucket_idx = 0
for exemplar in setx:
if exemplar.value >= self._boundaries[-1]:
exemplar_set[-1].append(exemplar)
continue

while exemplar.value >= self._boundaries[bucket_idx]:
bucket_idx += 1
exemplar_set[bucket_idx].append(exemplar)

# Pick only k exemplars for every bucket
for index, inner_set in enumerate(exemplar_set):
if len(inner_set) > self._k:
exemplar_set[index] = random.sample(inner_set, self._k)
return list(itertools.chain.from_iterable(exemplar_set))
def merge(self, set1: List[Exemplar], set2: List[Exemplar]):
"""
Assume that set2 is the latest set of exemplars.
For simplicity, we will just keep set2 and assume set1 has already been exported.
This may need to change with a different SDK implementation.
"""
return set2

def reset(self):
for sampler in self._sample_set:
Expand All @@ -285,9 +279,9 @@ class ExemplarManager:

def __init__(
self,
config,
default_exemplar_sampler,
statistical_exemplar_sampler,
config: dict,
default_exemplar_sampler: ExemplarSampler,
statistical_exemplar_sampler: ExemplarSampler,
**kwargs
):
if config:
Expand All @@ -311,7 +305,12 @@ def __init__(
else:
self.record_exemplars = False

def sample(self, value, dropped_labels, **kwargs):
def sample(
self,
value: Union[int, float],
dropped_labels: Tuple[Tuple[str, str]],
**kwargs
):
context = get_current()

is_sampled = (
Expand Down Expand Up @@ -343,7 +342,11 @@ def take_checkpoint(self):
return ret
return []

def merge(self, checkpoint_exemplars, other_checkpoint_exemplars):
def merge(
self,
checkpoint_exemplars: List[Exemplar],
other_checkpoint_exemplars: List[Exemplar],
):
if self.record_exemplars:
return self.exemplar_sampler.merge(
checkpoint_exemplars, other_checkpoint_exemplars
Expand Down
24 changes: 12 additions & 12 deletions opentelemetry-sdk/tests/metrics/export/test_exemplars.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ def test_merge(self):
set1 = [1, 2, 3]
set2 = [4, 5, 6]
sampler = RandomExemplarSampler(6)
self.assertEqual(set1 + set2, sampler.merge(set1, set2))
self.assertEqual(set2, sampler.merge(set1, set2))
sampler = RandomExemplarSampler(8)
self.assertEqual(set1 + set2, sampler.merge(set1, set2))
self.assertEqual(set2, sampler.merge(set1, set2))
sampler = RandomExemplarSampler(4)
self.assertEqual(4, len(sampler.merge(set1, set2)))
self.assertEqual(3, len(sampler.merge(set1, set2)))


class TestMinMaxExemplarSampler(unittest.TestCase):
Expand Down Expand Up @@ -140,10 +140,10 @@ def test_reset(self):
self.assertEqual(len(sampler.sample_set), 1)

def test_merge(self):
set1 = [1, 2, 3]
set2 = [4, 5, 6]
set1 = [1, 3]
set2 = [4, 6]
sampler = MinMaxExemplarSampler(2)
self.assertEqual([1, 6], sampler.merge(set1, set2))
self.assertEqual([4, 6], sampler.merge(set1, set2))


class TestBucketedExemplarSampler(unittest.TestCase):
Expand Down Expand Up @@ -195,7 +195,7 @@ def test_merge(self):
[Exemplar(2, time())],
)
),
2,
1,
)


Expand Down Expand Up @@ -339,7 +339,7 @@ def _merge_aggregators_test(self, aggregator):

agg1.merge(agg2)

self.assertEqual(len(agg1.checkpoint_exemplars), 2)
self.assertEqual(len(agg1.checkpoint_exemplars), 1)

def test_sum_aggregator(self):
self._no_exemplars_test(SumAggregator)
Expand Down Expand Up @@ -495,8 +495,8 @@ def test_histogram(self):
# Since this is using the HistogramAggregator, the bucket counts will be reflected
# with each record
requests_size.record(25, {"environment": "staging", "test": "value"})
requests_size.record(1, {"environment": "staging", "test": "value2"})
requests_size.record(200, {"environment": "staging", "test": "value3"})
requests_size.record(1, {"environment": "staging", "test": "value"})
requests_size.record(200, {"environment": "staging", "test": "value"})

controller.tick()
metrics_list = exporter.get_exported_metrics()
Expand All @@ -509,8 +509,8 @@ def test_histogram(self):
for exemplar in exemplars
],
[
(1, (("test", "value2"),)),
(1, (("test", "value"),)),
(25, (("test", "value"),)),
(200, (("test", "value3"),)),
(200, (("test", "value"),)),
],
)

0 comments on commit 97778a0

Please sign in to comment.