Skip to content

Commit

Permalink
Remove metrics windowing, clean up step names for metrics writing (#334)
Browse files Browse the repository at this point in the history
* Remove metrics windowing, clean up step names

* Remove unused variables
  • Loading branch information
Chen Zhiling authored and feast-ci-bot committed Nov 28, 2019
1 parent 81fb1ba commit 747bc3a
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public PDone expand(PCollection<FeatureRow> input) {

switch (storeType) {
case REDIS:

RedisConfig redisConfig = getStore().getRedisConfig();
input
.apply(
Expand All @@ -75,10 +76,6 @@ public PDone expand(PCollection<FeatureRow> input) {
case BIGQUERY:

BigQueryConfig bigqueryConfig = getStore().getBigqueryConfig();
TimePartitioning timePartitioning =
new TimePartitioning()
.setType("DAY")
.setField(FeatureRowToTableRow.getEventTimestampColumn());

WriteResult bigqueryWriteResult =
input
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;
import com.timgroup.statsd.StatsDClientException;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.ingestion.values.FailedElement;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;

@AutoValue
public abstract class WriteDeadletterRowMetricsDoFn extends
DoFn<KV<Integer, Iterable<FailedElement>>, Void> {
DoFn<FailedElement, Void> {

private static final Logger log = org.slf4j.LoggerFactory
.getLogger(WriteDeadletterRowMetricsDoFn.class);
Expand Down Expand Up @@ -59,17 +57,15 @@ public void setup() {

@ProcessElement
public void processElement(ProcessContext c) {

for (FailedElement ignored : c.element().getValue()) {
try {
statsd.count("deadletter_row_count", 1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + ignored.getFeatureSetName(),
FEATURE_SET_VERSION_TAG_KEY + ":" + ignored.getFeatureSetVersion(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
} catch (StatsDClientException e) {
log.warn("Unable to push metrics to server", e);
}
FailedElement ignored = c.element();
try {
statsd.count("deadletter_row_count", 1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + ignored.getFeatureSetName(),
FEATURE_SET_VERSION_TAG_KEY + ":" + ignored.getFeatureSetVersion(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
} catch (StatsDClientException e) {
log.warn("Unable to push metrics to server", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package feast.ingestion.transform.metrics;

import com.google.auto.value.AutoValue;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.values.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
Expand All @@ -17,9 +16,6 @@
@AutoValue
public abstract class WriteMetricsTransform extends PTransform<PCollectionTuple, PDone> {

private static final long WINDOW_SIZE_SECONDS = 15;
private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteMetricsTransform.class);

public abstract String getStoreName();

public abstract TupleTag<FeatureRow> getSuccessTag();
Expand Down Expand Up @@ -49,19 +45,15 @@ public PDone expand(PCollectionTuple input) {
case "statsd":

input.get(getFailureTag())
.apply("Window records",
new WindowRecords<>(WINDOW_SIZE_SECONDS))
.apply("Write deadletter metrics", ParDo.of(
.apply("WriteDeadletterMetrics", ParDo.of(
WriteDeadletterRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));

input.get(getSuccessTag())
.apply("Window records",
new WindowRecords<>(WINDOW_SIZE_SECONDS))
.apply("Write row metrics", ParDo
.apply("WriteRowMetrics", ParDo
.of(WriteRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
import feast.types.FieldProto.Field;
import feast.types.ValueProto.Value.ValCase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;

@AutoValue
public abstract class WriteRowMetricsDoFn extends DoFn<KV<Integer, Iterable<FeatureRow>>, Void> {
public abstract class WriteRowMetricsDoFn extends DoFn<FeatureRow, Void> {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class);

Expand Down Expand Up @@ -69,53 +68,52 @@ public void setup() {
@ProcessElement
public void processElement(ProcessContext c) {

long missingValueCount = 0;

try {
for (FeatureRow row : c.element().getValue()) {
long eventTimestamp = com.google.protobuf.util.Timestamps.toMillis(row.getEventTimestamp());

String[] split = row.getFeatureSet().split(":");
String featureSetName = split[0];
String featureSetVersion = split[1];
statsd.histogram("feature_row_lag_ms", System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

statsd.histogram("feature_row_event_time_epoch_ms", eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

for (Field field : row.getFieldsList()) {
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
statsd.histogram("feature_value_lag_ms", System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
FEATURE_TAG_KEY + ":" + field.getName(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
} else {
missingValueCount++;
}
FeatureRow row = c.element();
long eventTimestamp = com.google.protobuf.util.Timestamps.toMillis(row.getEventTimestamp());

String[] split = row.getFeatureSet().split(":");
String featureSetName = split[0];
String featureSetVersion = split[1];

statsd.histogram("feature_row_lag_ms", System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

statsd.histogram("feature_row_event_time_epoch_ms", eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

for (Field field : row.getFieldsList()) {
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
statsd.histogram("feature_value_lag_ms", System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
FEATURE_TAG_KEY + ":" + field.getName(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
} else {
statsd.count("feature_value_missing_count", 1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
FEATURE_TAG_KEY + ":" + field.getName(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
}
statsd.count("feature_row_ingested_count", 1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

statsd.count("feature_row_missing_value_count", missingValueCount,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
}

} catch (StatsDClientException e) {
statsd.count("feature_row_ingested_count", 1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

} catch (
StatsDClientException e) {
log.warn("Unable to push metrics to server", e);
}
}
Expand Down

0 comments on commit 747bc3a

Please sign in to comment.