Skip to content

Commit

Permalink
Add support for StringSet in JetRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitsinha54 committed Jul 6, 2024
1 parent e16da38 commit 2dd784b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeData;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

/** Jet specific implementation of {@link MetricsContainer}. */
Expand All @@ -47,6 +49,7 @@ public static String getMetricsMapName(long jobId) {
private final Map<MetricName, CounterImpl> counters = new HashMap<>();
private final Map<MetricName, DistributionImpl> distributions = new HashMap<>();
private final Map<MetricName, GaugeImpl> gauges = new HashMap<>();
private final Map<MetricName, StringSetImpl> stringSets = new HashMap<>();

private final IMap<String, MetricUpdates> accumulator;

Expand All @@ -71,17 +74,24 @@ public Gauge getGauge(MetricName metricName) {
return gauges.computeIfAbsent(metricName, GaugeImpl::new);
}

@Override
public StringSet getStringSet(MetricName metricName) {
return stringSets.computeIfAbsent(metricName, StringSetImpl::new);
}

@SuppressWarnings("FutureReturnValueIgnored")
public void flush(boolean async) {
if (counters.isEmpty() && distributions.isEmpty() && gauges.isEmpty()) {
if (counters.isEmpty() && distributions.isEmpty() && gauges.isEmpty() && stringSets.isEmpty()) {
return;
}

ImmutableList<MetricUpdates.MetricUpdate<Long>> counters = extractUpdates(this.counters);
ImmutableList<MetricUpdates.MetricUpdate<DistributionData>> distributions =
extractUpdates(this.distributions);
ImmutableList<MetricUpdates.MetricUpdate<GaugeData>> gauges = extractUpdates(this.gauges);
MetricUpdates updates = new MetricUpdatesImpl(counters, distributions, gauges);
ImmutableList<MetricUpdates.MetricUpdate<StringSetData>> stringSets =
extractUpdates(this.stringSets);
MetricUpdates updates = new MetricUpdatesImpl(counters, distributions, gauges, stringSets);

if (async) {
accumulator.setAsync(metricsKey, updates);
Expand Down Expand Up @@ -110,14 +120,17 @@ private static class MetricUpdatesImpl extends MetricUpdates implements Serializ
private final Iterable<MetricUpdate<Long>> counters;
private final Iterable<MetricUpdate<DistributionData>> distributions;
private final Iterable<MetricUpdate<GaugeData>> gauges;
private final Iterable<MetricUpdate<StringSetData>> stringSets;

MetricUpdatesImpl(
Iterable<MetricUpdate<Long>> counters,
Iterable<MetricUpdate<DistributionData>> distributions,
Iterable<MetricUpdate<GaugeData>> gauges) {
Iterable<MetricUpdate<GaugeData>> gauges,
Iterable<MetricUpdate<StringSetData>> stringSets) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
}

@Override
Expand All @@ -134,5 +147,10 @@ public Iterable<MetricUpdate<DistributionData>> distributionUpdates() {
public Iterable<MetricUpdate<GaugeData>> gaugeUpdates() {
return gauges;
}

@Override
public Iterable<MetricUpdate<StringSetData>> stringSetUpdates() {
return stringSets;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.apache.beam.runners.jet.metrics;

import java.util.Arrays;
import java.util.HashSet;
import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;

/** Implementation of {@link StringSet}. */
public class StringSetImpl extends AbstractMetric<StringSetData> implements StringSet {

private final StringSetData stringSetData = StringSetData.empty();

public StringSetImpl(MetricName name) {
super(name);
}

@Override
StringSetData getValue() {
return stringSetData;
}

@Override
public void add(String value) {
stringSetData.combine(StringSetData.create(ImmutableSet.of("ab")));
}

@Override
public void add(String... values) {
stringSetData.combine(StringSetData.create(new HashSet<>(Arrays.asList(values))));
}
}

0 comments on commit 2dd784b

Please sign in to comment.