Skip to content

Commit

Permalink
Implement ordered list state for FnApi. (#30317)
Browse files Browse the repository at this point in the history
* Add request and response proto messages for ordered list state.

* Initial implementation of OrderedListState for fnApi.

* Discard the use of the value coder in FakeBeamFnStateClient.

* Fix the behavior of pre-existing iterators on local change of state

If there are changes on a state after we obtain iterators from
calling read() and readRange(), the behavior of these pre-existing
iterators were incorrect in the previous implementation.

The change introduced here will make sure that these iterators will
still work as if no local change is made.

* Support continuation token for ordered list get request in fake client

* Add copyright notices to the new files

* Add binding for ordered list state in fnapi state accessor

* Clean up comments

* Apply spotless and checkStyle to reformat

* Add an encode-only coder for the use in the fake client.

* Remove request and response messages for ordered list state get.

* The range information is placed in the state key of ordered list
* For consistency, we reuse the existing get request and response
  mesasages of other states like Bag, MultiMap, etc.

* Remove request and response messages for ordered list state update

* Reuse existing messages of clear and append.

* Minor fixes based on feedbacks from reviewers

* Replace String::size() > 0 with String::isEmpty()
* Return this in readLater and readRangeLater instead of throwing
  an exception
* Remove the added SupressWarnings("unchecked")

* Apply spotless

* Use data field in AppendRequest for ordered list state

Previously, we used a repeated OrderedListEntry field in the
AppendRequest particularly for ordered list state. For consistency,
we now get rid of that and use the same data field as other states.

* Apply spotless

* Minor renaming of a variable

* Create a new coder for TimestampedValue according to the notes in proto.

* Address feedback from the reviewer

- Add a test to cover the case when an add/clear operation happens
  while we are partway through an existing iterable.
- Use clear() instead of clearRange(min, max) when we can.
- Fix a typo.

* Apply spotless

* Add urn for ordered list state.

* Add ordered list spec to ParDoTranslation.

* Fix an edge case when async called after clear. Minor fix based on reviwer comments.

* Refactor some variable names. Add a notes on the order of pendingAdds and pendingRemoves during async_close()
  • Loading branch information
shunping committed Jun 13, 2024
1 parent fd5b1de commit af31d35
Show file tree
Hide file tree
Showing 8 changed files with 1,314 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,11 @@ message StandardUserStateTypes {
// StateKey.MultimapKeysUserState or StateKey.MultimapUserState.
MULTIMAP = 1 [(beam_urn) = "beam:user_state:multimap:v1"];

// TODO(https://github.com/apache/beam/issues/20486): Add protocol to support OrderedListState
// Represents a user state specification that supports an ordered list.
//
// StateRequests performed on this user state must use
// StateKey.OrderedListUserState.
ORDERED_LIST = 2 [(beam_urn) = "beam:user_state:ordered_list:v1"];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class ParDoTranslation {
public static final String BAG_USER_STATE = "beam:user_state:bag:v1";
/** Represents a user state specification that supports a multimap. */
public static final String MULTIMAP_USER_STATE = "beam:user_state:multimap:v1";
/** Represents a user state specification that supports an ordered list. */
public static final String ORDERED_LIST_USER_STATE = "beam:user_state:ordered_list:v1";

static {
checkState(
Expand All @@ -141,6 +143,8 @@ public class ParDoTranslation {
BeamUrns.getUrn(StandardRequirements.Enum.REQUIRES_ON_WINDOW_EXPIRATION)));
checkState(BAG_USER_STATE.equals(BeamUrns.getUrn(StandardUserStateTypes.Enum.BAG)));
checkState(MULTIMAP_USER_STATE.equals(BeamUrns.getUrn(StandardUserStateTypes.Enum.MULTIMAP)));
checkState(
ORDERED_LIST_USER_STATE.equals(BeamUrns.getUrn(StandardUserStateTypes.Enum.ORDERED_LIST)));
}

/** The URN for an unknown Java {@link DoFn}. */
Expand Down Expand Up @@ -601,9 +605,7 @@ public RunnerApi.StateSpec dispatchOrderedList(Coder<?> elementCoder) {
.setOrderedListSpec(
RunnerApi.OrderedListStateSpec.newBuilder()
.setElementCoderId(registerCoderOrThrow(components, elementCoder)))
// TODO(https://github.com/apache/beam/issues/20486): Update with correct protocol
// once the protocol is defined and
// the SDK harness uses it.
.setProtocol(FunctionSpec.newBuilder().setUrn(ORDERED_LIST_USER_STATE))
.build();
}

Expand Down Expand Up @@ -694,6 +696,10 @@ static StateSpec<?> fromProto(RunnerApi.StateSpec stateSpec, RehydratedComponent
case SET_SPEC:
return StateSpecs.set(components.getCoder(stateSpec.getSetSpec().getElementCoderId()));

case ORDERED_LIST_SPEC:
return StateSpecs.orderedList(
components.getCoder(stateSpec.getOrderedListSpec().getElementCoderId()));

case SPEC_NOT_SET:
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ public static Iterable<Object[]> stateSpecs() {
{
StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of()),
FunctionSpec.newBuilder().setUrn(ParDoTranslation.MULTIMAP_USER_STATE).build()
},
{
StateSpecs.orderedList(VarIntCoder.of()),
FunctionSpec.newBuilder().setUrn(ParDoTranslation.ORDERED_LIST_USER_STATE).build()
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.MultimapState;
import org.apache.beam.sdk.state.OrderedListState;
Expand All @@ -63,12 +64,14 @@
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.construction.BeamUrns;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

/** Provides access to side inputs and state via a {@link BeamFnStateClient}. */
@SuppressWarnings({
Expand Down Expand Up @@ -600,8 +603,73 @@ public <KeyT, ValueT> MultimapState<KeyT, ValueT> bindMultimap(
@Override
public <T> OrderedListState<T> bindOrderedList(
String id, StateSpec<OrderedListState<T>> spec, Coder<T> elemCoder) {
throw new UnsupportedOperationException(
"TODO: Add support for a sorted-list state to the Fn API.");
return (OrderedListState<T>)
stateKeyObjectCache.computeIfAbsent(
createOrderedListUserStateKey(id),
new Function<StateKey, Object>() {
@Override
public Object apply(StateKey key) {
return new OrderedListState<T>() {
private final OrderedListUserState<T> impl =
createOrderedListUserState(key, elemCoder);

@Override
public void clear() {
impl.clear();
}

@Override
public void add(TimestampedValue<T> value) {
impl.add(value);
}

@Override
public ReadableState<Boolean> isEmpty() {
return new ReadableState<Boolean>() {
@Override
public @Nullable Boolean read() {
return !impl.read().iterator().hasNext();
}

@Override
public ReadableState<Boolean> readLater() {
return this;
}
};
}

@Nullable
@Override
public Iterable<TimestampedValue<T>> read() {
return readRange(
Instant.ofEpochMilli(Long.MIN_VALUE), Instant.ofEpochMilli(Long.MAX_VALUE));
}

@Override
public GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>>
readLater() {
return this;
}

@Override
public Iterable<TimestampedValue<T>> readRange(
Instant minTimestamp, Instant limitTimestamp) {
return impl.readRange(minTimestamp, limitTimestamp);
}

@Override
public void clearRange(Instant minTimestamp, Instant limitTimestamp) {
impl.clearRange(minTimestamp, limitTimestamp);
}

@Override
public OrderedListState<T> readRangeLater(
Instant minTimestamp, Instant limitTimestamp) {
return this;
}
};
}
});
}

@Override
Expand Down Expand Up @@ -849,6 +917,30 @@ private StateKey createMultimapKeysUserStateKey(String stateId) {
return builder.build();
}

private <T> OrderedListUserState<T> createOrderedListUserState(
StateKey stateKey, Coder<T> valueCoder) {
OrderedListUserState<T> rval =
new OrderedListUserState<>(
getCacheFor(stateKey),
beamFnStateClient,
processBundleInstructionId.get(),
stateKey,
valueCoder);
stateFinalizers.add(rval::asyncClose);
return rval;
}

private StateKey createOrderedListUserStateKey(String stateId) {
StateKey.Builder builder = StateKey.newBuilder();
builder
.getOrderedListUserStateBuilder()
.setWindow(encodedCurrentWindowSupplier.get())
.setKey(encodedCurrentKeySupplier.get())
.setTransformId(ptransformId)
.setUserStateId(stateId);
return builder.build();
}

public void finalizeState() {
// Persist all dirty state cells
try {
Expand Down
Loading

0 comments on commit af31d35

Please sign in to comment.