Skip to content

Commit

Permalink
Remove metrics windowing, clean up step names for metrics writing (fe…
Browse files Browse the repository at this point in the history
…ast-dev#334)

* Remove metrics windowing, clean up step names

* Remove unused variables
  • Loading branch information
Chen Zhiling committed Nov 28, 2019
1 parent 2347a76 commit 9c4b36a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public PDone expand(PCollection<FeatureRow> input) {

switch (storeType) {
case REDIS:

RedisConfig redisConfig = getStore().getRedisConfig();
input
.apply(
Expand All @@ -91,10 +92,6 @@ public PDone expand(PCollection<FeatureRow> input) {
break;
case BIGQUERY:
BigQueryConfig bigqueryConfig = getStore().getBigqueryConfig();
TimePartitioning timePartitioning =
new TimePartitioning()
.setType("DAY")
.setField(FeatureRowToTableRow.getEventTimestampColumn());

WriteResult bigqueryWriteResult =
input.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
import com.timgroup.statsd.StatsDClientException;
import feast.ingestion.values.FailedElement;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;

@AutoValue
public abstract class WriteDeadletterRowMetricsDoFn
extends DoFn<KV<Integer, Iterable<FailedElement>>, Void> {
public abstract class WriteDeadletterRowMetricsDoFn extends
DoFn<FailedElement, Void> {

private static final Logger log =
org.slf4j.LoggerFactory.getLogger(WriteDeadletterRowMetricsDoFn.class);
private static final Logger log = org.slf4j.LoggerFactory
.getLogger(WriteDeadletterRowMetricsDoFn.class);

private final String INGESTION_JOB_NAME_KEY = "ingestion_job_name";
private final String METRIC_PREFIX = "feast_ingestion";
Expand Down Expand Up @@ -69,19 +68,15 @@ public void setup() {

@ProcessElement
public void processElement(ProcessContext c) {

for (FailedElement ignored : c.element().getValue()) {
try {
statsd.count(
"deadletter_row_count",
1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + ignored.getFeatureSetName(),
FEATURE_SET_VERSION_TAG_KEY + ":" + ignored.getFeatureSetVersion(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
} catch (StatsDClientException e) {
log.warn("Unable to push metrics to server", e);
}
FailedElement ignored = c.element();
try {
statsd.count("deadletter_row_count", 1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + ignored.getFeatureSetName(),
FEATURE_SET_VERSION_TAG_KEY + ":" + ignored.getFeatureSetVersion(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
} catch (StatsDClientException e) {
log.warn("Unable to push metrics to server", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2019 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.transform.metrics;

import com.google.auto.value.AutoValue;
Expand All @@ -28,12 +12,10 @@
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;


@AutoValue
public abstract class WriteMetricsTransform extends PTransform<PCollectionTuple, PDone> {

private static final long WINDOW_SIZE_SECONDS = 15;
private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteMetricsTransform.class);

public abstract String getStoreName();

public abstract TupleTag<FeatureRow> getSuccessTag();
Expand All @@ -48,7 +30,6 @@ public static Builder newBuilder() {
public abstract static class Builder {

public abstract Builder setStoreName(String storeName);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);
Expand All @@ -58,45 +39,36 @@ public abstract static class Builder {

@Override
public PDone expand(PCollectionTuple input) {
ImportOptions options = input.getPipeline().getOptions().as(ImportOptions.class);
ImportOptions options = input.getPipeline().getOptions()
.as(ImportOptions.class);
switch (options.getMetricsExporterType()) {
case "statsd":
input
.get(getFailureTag())
.apply("Window records", new WindowRecords<>(WINDOW_SIZE_SECONDS))
.apply(
"Write deadletter metrics",
ParDo.of(
WriteDeadletterRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));

input
.get(getSuccessTag())
.apply("Window records", new WindowRecords<>(WINDOW_SIZE_SECONDS))
.apply(
"Write row metrics",
ParDo.of(
WriteRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));
input.get(getFailureTag())
.apply("WriteDeadletterMetrics", ParDo.of(
WriteDeadletterRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));

input.get(getSuccessTag())
.apply("WriteRowMetrics", ParDo
.of(WriteRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));

return PDone.in(input.getPipeline());
case "none":
default:
input
.get(getSuccessTag())
.apply(
"Noop",
ParDo.of(
new DoFn<FeatureRow, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {}
}));
input.get(getSuccessTag()).apply("Noop",
ParDo.of(new DoFn<FeatureRow, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
}
}));
return PDone.in(input.getPipeline());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2019 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.transform.metrics;

import com.google.auto.value.AutoValue;
Expand All @@ -25,11 +9,10 @@
import feast.types.FieldProto.Field;
import feast.types.ValueProto.Value.ValCase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;

@AutoValue
public abstract class WriteRowMetricsDoFn extends DoFn<KV<Integer, Iterable<FeatureRow>>, Void> {
public abstract class WriteRowMetricsDoFn extends DoFn<FeatureRow, Void> {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class);

Expand All @@ -46,11 +29,8 @@ public abstract class WriteRowMetricsDoFn extends DoFn<KV<Integer, Iterable<Feat

public abstract int getStatsdPort();

public static WriteRowMetricsDoFn create(
String newStoreName,
FeatureSetSpec newFeatureSetSpec,
String newStatsdHost,
int newStatsdPort) {
public static WriteRowMetricsDoFn create(String newStoreName, FeatureSetSpec newFeatureSetSpec,
String newStatsdHost, int newStatsdPort) {
return newBuilder()
.setStoreName(newStoreName)
.setStatsdHost(newStatsdHost)
Expand Down Expand Up @@ -78,69 +58,62 @@ public abstract static class Builder {

@Setup
public void setup() {
statsd = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort());
statsd = new NonBlockingStatsDClient(
METRIC_PREFIX,
getStatsdHost(),
getStatsdPort()
);
}

@ProcessElement
public void processElement(ProcessContext c) {

long missingValueCount = 0;

try {
for (FeatureRow row : c.element().getValue()) {
long eventTimestamp = com.google.protobuf.util.Timestamps.toMillis(row.getEventTimestamp());

String[] split = row.getFeatureSet().split(":");
String featureSetName = split[0];
String featureSetVersion = split[1];
statsd.histogram(
"feature_row_lag_ms",
System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

statsd.histogram(
"feature_row_event_time_epoch_ms",
eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

for (Field field : row.getFieldsList()) {
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
statsd.histogram(
"feature_value_lag_ms",
System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
FEATURE_TAG_KEY + ":" + field.getName(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
} else {
missingValueCount++;
}
FeatureRow row = c.element();
long eventTimestamp = com.google.protobuf.util.Timestamps.toMillis(row.getEventTimestamp());

String[] split = row.getFeatureSet().split(":");
String featureSetName = split[0];
String featureSetVersion = split[1];

statsd.histogram("feature_row_lag_ms", System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

statsd.histogram("feature_row_event_time_epoch_ms", eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

for (Field field : row.getFieldsList()) {
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
statsd.histogram("feature_value_lag_ms", System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
FEATURE_TAG_KEY + ":" + field.getName(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
} else {
statsd.count("feature_value_missing_count", 1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
FEATURE_TAG_KEY + ":" + field.getName(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
}
statsd.count(
"feature_row_ingested_count",
1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

statsd.count(
"feature_row_missing_value_count",
missingValueCount,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
}

} catch (StatsDClientException e) {
statsd.count("feature_row_ingested_count", 1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

} catch (
StatsDClientException e) {
log.warn("Unable to push metrics to server", e);
}
}
Expand Down

0 comments on commit 9c4b36a

Please sign in to comment.