Skip to content

Commit

Permalink
Compute and write metrics for rows prior to store writes (#763)
Browse files Browse the repository at this point in the history
* Compute and write metrics for rows prior to store writes

* Fix tests, update docs, rebase on master
  • Loading branch information
zhilingc authored Jun 19, 2020
1 parent 64e5bc4 commit a00bef9
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 90 deletions.
1 change: 1 addition & 0 deletions docs/user-guide/statistics.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,5 @@ The metrics are tagged with and can be aggregated by the following keys:
| feast\_featureSet\_name | feature set name |
| feast\_feature\_name | feature name |
| ingestion\_job\_name | id of the population job writing the feature values. |
| metrics\_namespace | either `Inflight` or `WriteToStoreSuccess` |

14 changes: 10 additions & 4 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import feast.ingestion.transform.ProcessAndValidateFeatureRows;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.metrics.WriteFailureMetricsTransform;
import feast.ingestion.transform.metrics.WriteInflightMetricsTransform;
import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform;
import feast.ingestion.transform.specs.ReadFeatureSetSpecs;
import feast.ingestion.transform.specs.WriteFeatureSetSpecAck;
Expand Down Expand Up @@ -147,13 +148,18 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
for (Store store : stores) {
FeatureSink featureSink = getFeatureSink(store, featureSetSpecs);

// Step 5. Write FeatureRow to the corresponding Store.
// Step 5. Write metrics of successfully validated rows
validatedRows
.get(FEATURE_ROW_OUT)
.apply("WriteInflightMetrics", WriteInflightMetricsTransform.create(store.getName()));

// Step 6. Write FeatureRow to the corresponding Store.
WriteResult writeFeatureRows =
storeAllocatedRows
.get(storeTags.get(store))
.apply("WriteFeatureRowToStore", featureSink.writer());

// Step 6. Write FailedElements to a dead letter table in BigQuery.
// Step 7. Write FailedElements to a dead letter table in BigQuery.
if (options.getDeadLetterTableSpec() != null) {
// TODO: make deadletter destination type configurable
DeadletterSink deadletterSink =
Expand All @@ -172,7 +178,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.apply("WriteFailedElements_WriteFeatureRowToStore", deadletterSink.write());
}

// Step 7. Write metrics to a metrics sink.
// Step 8. Write metrics to a metrics sink.
writeFeatureRows
.getSuccessfulInserts()
.apply("WriteSuccessMetrics", WriteSuccessMetricsTransform.create(store.getName()));
Expand All @@ -182,7 +188,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.apply("WriteFailureMetrics", WriteFailureMetricsTransform.create(store.getName()));
}

// Step 8. Send ack that FeatureSetSpec state is updated
// Step 9. Send ack that FeatureSetSpec state is updated
featureSetSpecs.apply(
"WriteAck",
WriteFeatureSetSpecAck.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@
*/
package feast.ingestion.transform.metrics;

import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_NAME_TAG_KEY;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_PROJECT_TAG_KEY;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_TAG_KEY;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.INGESTION_JOB_NAME_KEY;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.METRIC_PREFIX;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.STORE_TAG_KEY;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.*;

import com.google.auto.value.AutoValue;
import com.timgroup.statsd.NonBlockingStatsDClient;
Expand Down Expand Up @@ -67,6 +62,8 @@ public abstract class WriteFeatureValueMetricsDoFn

abstract int getStatsdPort();

abstract String getMetricsNamespace();

static Builder newBuilder() {
return new AutoValue_WriteFeatureValueMetricsDoFn.Builder();
}
Expand All @@ -80,6 +77,8 @@ abstract static class Builder {

abstract Builder setStatsdPort(int statsdPort);

abstract Builder setMetricsNamespace(String metricsNamespace);

abstract WriteFeatureValueMetricsDoFn build();
}

Expand Down Expand Up @@ -158,7 +157,8 @@ public void processElement(
FEATURE_SET_PROJECT_TAG_KEY + ":" + projectName,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_TAG_KEY + ":" + featureName,
INGESTION_JOB_NAME_KEY + ":" + context.getPipelineOptions().getJobName()
INGESTION_JOB_NAME_KEY + ":" + context.getPipelineOptions().getJobName(),
METRICS_NAMESPACE_KEY + ":" + getMetricsNamespace(),
};

// stats can return non finite values when there is no element
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2019 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.transform.metrics;

import com.google.auto.value.AutoValue;
import feast.ingestion.options.ImportOptions;
import feast.proto.types.FeatureRowProto.FeatureRow;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;

@AutoValue
public abstract class WriteInflightMetricsTransform
extends PTransform<PCollection<FeatureRow>, PDone> {

public static final String METRIC_NAMESPACE = "Inflight";
public static final String ELEMENTS_WRITTEN_METRIC = "elements_count";
private static final Counter elements_count =
Metrics.counter(METRIC_NAMESPACE, ELEMENTS_WRITTEN_METRIC);

public abstract String getStoreName();

public static WriteInflightMetricsTransform create(String storeName) {
return new AutoValue_WriteInflightMetricsTransform(storeName);
}

@Override
public PDone expand(PCollection<FeatureRow> input) {
ImportOptions options = input.getPipeline().getOptions().as(ImportOptions.class);

input.apply(
"IncrementInflightElementsCounter",
MapElements.into(TypeDescriptors.booleans())
.via(
(FeatureRow row) -> {
elements_count.inc();
return true;
}));

switch (options.getMetricsExporterType()) {
case "statsd":

// Fixed window is applied so the metric collector will not be overwhelmed with the metrics
// data. For validation, only summaries of the values are usually required vs the actual
// values.
PCollection<KV<String, Iterable<FeatureRow>>> rowsGroupedByRef =
input
.apply(
"FixedWindow",
Window.into(
FixedWindows.of(
Duration.standardSeconds(
options.getWindowSizeInSecForFeatureValueMetric()))))
.apply(
"ConvertToKV_FeatureSetRefToFeatureRow",
ParDo.of(
new DoFn<FeatureRow, KV<String, FeatureRow>>() {
@ProcessElement
public void processElement(
ProcessContext c, @Element FeatureRow featureRow) {
c.output(KV.of(featureRow.getFeatureSet(), featureRow));
}
}))
.apply("GroupByFeatureSetRef", GroupByKey.create());

rowsGroupedByRef.apply(
"WriteInflightRowMetrics",
ParDo.of(
WriteRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.setMetricsNamespace(METRIC_NAMESPACE)
.build()));

rowsGroupedByRef.apply(
"WriteInflightFeatureValueMetrics",
ParDo.of(
WriteFeatureValueMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.setMetricsNamespace(METRIC_NAMESPACE)
.build()));

return PDone.in(input.getPipeline());
case "none":
default:
input.apply(
"Noop",
ParDo.of(
new DoFn<FeatureRow, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {}
}));
return PDone.in(input.getPipeline());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public abstract class WriteRowMetricsDoFn extends DoFn<KV<String, Iterable<Featu
public static final String STORE_TAG_KEY = "feast_store";
public static final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name";
public static final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name";
public static final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version";
public static final String FEATURE_TAG_KEY = "feast_feature_name";
public static final String INGESTION_JOB_NAME_KEY = "ingestion_job_name";
public static final String METRICS_NAMESPACE_KEY = "metrics_namespace";

public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MIN = "feature_row_lag_ms_min";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MAX = "feature_row_lag_ms_max";
Expand Down Expand Up @@ -77,6 +77,8 @@ public abstract class WriteRowMetricsDoFn extends DoFn<KV<String, Iterable<Featu

public abstract int getStatsdPort();

public abstract String getMetricsNamespace();

@Nullable
public abstract Clock getClock();

Expand Down Expand Up @@ -104,6 +106,8 @@ public abstract static class Builder {

public abstract Builder setStatsdPort(int statsdPort);

public abstract Builder setMetricsNamespace(String metricNamespace);

/**
* setClock will override the default system clock used to calculate feature row lag.
*
Expand Down Expand Up @@ -193,6 +197,7 @@ public void processElement(
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName(),
METRICS_NAMESPACE_KEY + ":" + getMetricsNamespace(),
};

statsd.count(COUNT_NAME_FEATURE_ROW_INGESTED, featureRowLagStats.getN(), tags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public void processElement(
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.setMetricsNamespace(METRIC_NAMESPACE)
.build()));

validRowsGroupedByRef.apply(
Expand All @@ -101,6 +102,7 @@ public void processElement(
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.setMetricsNamespace(METRIC_NAMESPACE)
.build()));

return PDone.in(input.getPipeline());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void shouldSendCorrectStatsDMetrics() throws IOException, InterruptedExce
.setStatsdHost("localhost")
.setStatsdPort(STATSD_SERVER_PORT)
.setStoreName("store")
.setMetricsNamespace("test")
.build()));
pipeline.run(pipelineOptions).waitUntilFinish();
// Wait until StatsD has finished processed all messages, 3 sec is a reasonable duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void shouldSendCorrectStatsDMetrics() throws IOException, InterruptedExce
.setStatsdPort(STATSD_SERVER_PORT)
.setStoreName("store")
.setClock(Clock.fixed(Instant.ofEpochSecond(1585548645), ZoneId.of("UTC")))
.setMetricsNamespace("test")
.build()));
pipeline.run(pipelineOptions).waitUntilFinish();
// Wait until StatsD has finished processed all messages, 3 sec is a reasonable duration
Expand Down
Loading

0 comments on commit a00bef9

Please sign in to comment.