Skip to content

Commit

Permalink
Fixes for review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitsinha54 committed Jul 8, 2024
1 parent 67ef5bd commit ad17a69
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public StringSetCell getStringSet(MetricName metricName) {
}

/**
* Return a {@code StringSetCell} named {@code metricName}.If it doesn't exist, return {@code
* Return a {@code StringSetCell} named {@code metricName}. If it doesn't exist, return {@code
* null}.
*/
public @Nullable StringSetCell tryGetStringSet(MetricName metricName) {
Expand Down Expand Up @@ -418,7 +418,7 @@ private String getShortId(
}

/**
* Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as
* Mark all the updates that were retrieved with the latest call to {@link #getUpdates()} as
* committed.
*/
public void commitUpdates() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;

/** A set of functions used to encode and decode common monitoring info types. */
public class MonitoringInfoEncodings {
private static final Coder<Long> VARINT_CODER = VarLongCoder.of();
private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
private static final IterableCoder<String> STRING_SET_CODER =
IterableCoder.of(StringUtf8Coder.of());

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#DISTRIBUTION_INT64_TYPE}. */
public static ByteString encodeInt64Distribution(DistributionData data) {
Expand Down Expand Up @@ -106,12 +107,7 @@ public static GaugeData decodeInt64Gauge(ByteString payload) {
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#SET_STRING_TYPE}. */
public static ByteString encodeStringSet(StringSetData data) {
try (ByteStringOutputStream output = new ByteStringOutputStream()) {
// encode the length of set
STRING_CODER.encode(String.valueOf(data.stringSet().size()), output);
// encode all elements
for (String s : data.stringSet()) {
STRING_CODER.encode(s, output);
}
STRING_SET_CODER.encode(data.stringSet(), output);
return output.toByteString();
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -120,13 +116,8 @@ public static ByteString encodeStringSet(StringSetData data) {

/** Decodes from {@link MonitoringInfoConstants.TypeUrns#SET_STRING_TYPE}. */
public static StringSetData decodeStringSet(ByteString payload) {
Set<String> elements = new HashSet<>();
try (InputStream input = payload.newInput()) {
int size = Integer.parseInt(Objects.requireNonNull(STRING_CODER.decode(input)));
while (size > 0) {
elements.add(Objects.requireNonNull(STRING_CODER.decode(input)));
size--;
}
Set<String> elements = Sets.newHashSet(STRING_SET_CODER.decode(input));
return StringSetData.create(elements);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.apache.beam.runners.core.metrics;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand Down Expand Up @@ -99,11 +98,15 @@ public int hashCode() {

@Override
public void add(String value) {
update(StringSetData.create(new HashSet<>(Collections.singletonList(value))));
// if the given value is already present in the StringSet then skip this add for efficiency
if (this.setValue.get().stringSet().contains(value)) {
return;
}
update(StringSetData.create(ImmutableSet.of(value)));
}

@Override
public void add(String... values) {
update(StringSetData.create(new HashSet<>(Arrays.asList(values))));
update(StringSetData.create(ImmutableSet.copyOf(Arrays.asList(values))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,27 @@

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;

/**
* Data describing the StringSet. This should retain enough detail that it can be combined with
* Data describing the StringSet. The {@link StringSetData} hold an immutable copy
* of the set from which it was initially created representing that a result cannot be modified
* once created. This should retain enough detail that it can be combined with
* other {@link StringSetData}.
*/
@AutoValue
public abstract class StringSetData implements Serializable {

public abstract Set<String> stringSet();

/** Returns a {@link StringSetData} which consists of the given set. */
public static StringSetData create(Set<String> stringSet) {
return new AutoValue_StringSetData(stringSet);
/** Returns a {@link StringSetData} which is made from an immutable copy of the given set. */
public static StringSetData create(Set<String> set) {
return new AutoValue_StringSetData(ImmutableSet.copyOf(set));
}

/** Return a {@link EmptyStringSetData#INSTANCE} representing an empty {@link StringSetData}. */
Expand All @@ -49,10 +53,21 @@ public static StringSetData empty() {
public StringSetData combine(StringSetData other) {
// do not merge other on this as this StringSetData might hold an immutable set like in case
// of EmptyStringSetData
Set<String> merged = new HashSet<>();
merged.addAll(this.stringSet());
merged.addAll(other.stringSet());
return StringSetData.create(merged);
Set<String> combined = new HashSet<>();
combined.addAll(this.stringSet());
combined.addAll(other.stringSet());
return StringSetData.create(combined);
}

/**
* Combines this {@link StringSetData} with others, all original StringSetData are left intact.
*/
public StringSetData combine(Iterable<StringSetData> others) {
Set<String> combined = StreamSupport.stream(others.spliterator(), true)
.flatMap(other -> other.stringSet().stream())
.collect(Collectors.toSet());
combined.addAll(this.stringSet());
return StringSetData.create(combined);
}

/** Returns a {@link StringSetResult} representing this {@link StringSetData}. */
Expand All @@ -67,10 +82,10 @@ public static class EmptyStringSetData extends StringSetData {

private EmptyStringSetData() {}

/** Return an immutable empty set. */
/** Returns an immutable empty set. */
@Override
public Set<String> stringSet() {
return Collections.emptySet();
return ImmutableSet.of();
}

/** Return a {@link StringSetResult#empty()} which is immutable empty set. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.core.metrics;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.util.Collections;
Expand Down Expand Up @@ -75,10 +76,9 @@ public void testEmpty() {
}

@Test
public void testNoAddToStringSetDataEmpty() {
exception.expect(UnsupportedOperationException.class);
public void testStringSetDataEmptyIsImmutable() {
StringSetData empty = StringSetData.empty();
empty.stringSet().add("aa");
assertThrows(UnsupportedOperationException.class, () -> empty.stringSet().add("aa"));
}

@Test
Expand All @@ -92,4 +92,11 @@ public void testExtract() {
StringSetData stringSetData = StringSetData.create(contents);
assertEquals(stringSetData.stringSet(), contents);
}

@Test
public void testExtractReturnsImmutable() {
StringSetData stringSetData = StringSetData.create(ImmutableSet.of("ab", "cd"));
// check that immutable copy is returned
assertThrows(UnsupportedOperationException.class, () -> stringSetData.stringSet().add("aa"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ public StringSetData zero() {
@Override
public StringSetData combine(Iterable<StringSetData> updates) {
StringSetData result = StringSetData.empty();
for (StringSetData update : updates) {
result = result.combine(update);
}
result = result.combine(updates);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
Expand All @@ -44,6 +44,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -190,7 +191,7 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) {
if (metricUpdate.getSet() == null) {
return StringSetResult.empty();
}
return StringSetResult.create(new HashSet<>(((ArrayList) metricUpdate.getSet())));
return StringSetResult.create(ImmutableSet.copyOf(((Set) metricUpdate.getSet())));
}

private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@
import com.google.api.services.dataflow.model.MetricUpdate;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
import org.apache.beam.sdk.PipelineResult.State;
Expand Down Expand Up @@ -190,7 +189,7 @@ private MetricUpdate makeCounterMetricUpdate(
}

private MetricUpdate makeStringSetMetricUpdate(
String name, String namespace, String step, List<String> setValues, boolean tentative) {
String name, String namespace, String step, Set<String> setValues, boolean tentative) {
MetricUpdate update = new MetricUpdate();
update.setSet(setValues);
return setStructuredName(update, name, namespace, step, tentative);
Expand Down Expand Up @@ -258,10 +257,10 @@ public void testSingleStringSetUpdates() throws IOException {
// the job metrics results.
MetricUpdate mu1 =
makeStringSetMetricUpdate(
"counterName", "counterNamespace", "s2", Arrays.asList("ab", "cd"), false);
"counterName", "counterNamespace", "s2", ImmutableSet.of("ab", "cd"), false);
MetricUpdate mu1Tentative =
makeStringSetMetricUpdate(
"counterName", "counterNamespace", "s2", Arrays.asList("ef", "gh"), true);
"counterName", "counterNamespace", "s2", ImmutableSet.of("ab", "cd"), true);
jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative));
DataflowClient dataflowClient = mock(DataflowClient.class);
when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
Expand All @@ -283,7 +282,7 @@ public void testSingleStringSetUpdates() throws IOException {
"counterNamespace",
"counterName",
"myStepName",
StringSetResult.create(ImmutableSet.of("ef", "gh")))));
StringSetResult.create(ImmutableSet.of("ab", "cd")))));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ StringSetData getValue() {

@Override
public void add(String value) {
stringSetData.combine(StringSetData.create(ImmutableSet.of("ab")));
if (stringSetData.stringSet().contains(value)) {
return;
}
stringSetData.combine(StringSetData.create(ImmutableSet.of(value)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,37 @@
package org.apache.beam.sdk.metrics;

import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.Set;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;

/** The result of a {@link StringSet} metric. */
/** The result of a {@link StringSet} metric. The {@link StringSetResult} hold an immutable copy
* of the set from which it was initially created representing that a result cannot be modified
* once created.
* */
@AutoValue
public abstract class StringSetResult {
public abstract Set<String> getStringSet();

/**
* Creates a {@link StringSetResult} from the given {@link Set} by first making a
* {@link ImmutableSet#copyOf(Object[])}.
* @param s the set from which the {@link StringSetResult} should be created.
* @return {@link StringSetResult} containing an immutable copy of the given set.
*/
public static StringSetResult create(Set<String> s) {
return new AutoValue_StringSetResult(s);
return new AutoValue_StringSetResult(ImmutableSet.copyOf(s));
}

/**
* @return a {@link EmptyStringSetResult}
*/
public static StringSetResult empty() {
return EmptyStringSetResult.INSTANCE;
}

/** Empty {@link StringSetResult}, representing no values reported. */
/** Empty {@link StringSetResult}, representing no values reported. Holds an
* {@link ImmutableSet} empty set.
* */
public static class EmptyStringSetResult extends StringSetResult {

private static final EmptyStringSetResult INSTANCE = new EmptyStringSetResult();
Expand All @@ -44,7 +58,7 @@ private EmptyStringSetResult() {}
/** Returns an empty immutable set. */
@Override
public Set<String> getStringSet() {
return Collections.emptySet();
return ImmutableSet.of();
}
}
}
Loading

0 comments on commit ad17a69

Please sign in to comment.