Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add information on python metrics to the programming guide #32464

Merged
merged 2 commits into from
Sep 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions website/www/site/content/en/documentation/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -6096,6 +6096,18 @@ func (fn *MyDoFn) ProcessElement(ctx context.Context, ...) {
}
{{< /highlight >}}

{{< highlight py>}}
from apache_beam import metrics

class MyDoFn(beam.DoFn):
def __init__(self):
self.counter = metrics.Metrics.counter("namespace", "counter1")

def process(self, element):
self.counter.inc()
yield element
{{< /highlight >}}

**Distribution**: A metric that reports information about the distribution of reported values.

{{< highlight java >}}
Expand All @@ -6120,6 +6132,16 @@ func (fn *MyDoFn) ProcessElement(ctx context.Context, v int64, ...) {
}
{{< /highlight >}}

{{< highlight py >}}
class MyDoFn(beam.DoFn):
def __init__(self):
self.distribution = metrics.Metrics.distribution("namespace", "distribution1")

def process(self, element):
self.distribution.update(element)
yield element
{{< /highlight >}}

**Gauge**: A metric that reports the latest value out of reported values. Since metrics are
collected from many workers the value may not be the absolute last, but one of the latest values.

Expand All @@ -6145,6 +6167,16 @@ func (fn *MyDoFn) ProcessElement(ctx context.Context, v int64, ...) {
}
{{< /highlight >}}

{{< highlight py >}}
class MyDoFn(beam.DoFn):
def __init__(self):
self.gauge = metrics.Metrics.gauge("namespace", "gauge1")

def process(self, element):
self.gaguge.set(element)
yield element
{{< /highlight >}}

### 10.3. Querying metrics {#querying-metrics}
{{< paragraph class="language-java language-python">}}
`PipelineResult` has a method `metrics()` which returns a `MetricResults` object that allows
Expand All @@ -6159,6 +6191,17 @@ matching a given filter. It takes in a predicate with a `SingleResult` paramete
be used for custom filters.
{{< /paragraph >}}

{{< paragraph class="language-py">}}
`PipelineResult` has a `metrics` method that returns a `MetricResults` object. The `MetricResults` object lets you
access metrics. The main method available in the `MetricResults` object, `query`, lets you
query all metrics that match a given filter. The `query` method takes in a `MetricsFilter` object that you can
use to filter by several different criteria. Querying a `MetricResults` object returns
a dictionary of lists of `MetricResult` objects, with the dictionary organizing them by type,
for example, `Counter`, `Distribution`, and `Gauge`. The `MetricResult` object contains a `result` function
that gets the value of the metric and contains a `key` property. The `key` property contains information about
the namespace and the name of the metric.
{{< /paragraph >}}

{{< highlight java >}}
public interface PipelineResult {
MetricResults metrics();
Expand Down Expand Up @@ -6186,6 +6229,20 @@ public interface MetricResult<T> {
{{< code_sample "sdks/go/examples/snippets/10metrics.go" metrics_query >}}
{{< /highlight >}}

{{< highlight py >}}
class PipelineResult:
def metrics(self) -> MetricResults:
"""Returns a the metric results from the pipeline."""

class MetricResults:
def query(self, filter: MetricsFilter) -> Dict[str, List[MetricResult]]:
"""Filters the results against the specified filter."""

class MetricResult:
def result(self):
"""Returns the value of the metric."""
{{< /highlight >}}

### 10.4. Using metrics in pipeline {#using-metrics}
Below, there is a simple example of how to use a `Counter` metric in a user pipeline.

Expand Down Expand Up @@ -6228,6 +6285,28 @@ public class MyMetricsDoFn extends DoFn<Integer, Integer> {
{{< code_sample "sdks/go/examples/snippets/10metrics.go" metrics_pipeline >}}
{{< /highlight >}}

{{< highlight py >}}
class MyMetricsDoFn(beam.DoFn):
def __init__(self):
self.counter = metrics.Metrics.counter("namespace", "counter1")

def process(self, element):
counter.inc()
yield element

pipeline = beam.Pipeline()

pipeline | beam.ParDo(MyMetricsDoFn())

result = pipeline.run().wait_until_finish()

metrics = result.metrics().query(
metrics.MetricsFilter.with_namespace("namespace").with_name("counter1"))

for metric in metrics["counters"]:
print(metric)
{{< /highlight >}}

### 10.5. Export metrics {#export-metrics}

Beam metrics can be exported to external sinks. If a metrics sink is set up in the configuration, the runner will push metrics to it at a default 5s period.
Expand Down
Loading