Skip to content

Commit

Permalink
fix race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
pyalex committed Jul 14, 2020
1 parent 121cf20 commit 88a2a0e
Showing 1 changed file with 54 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@

import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.value.AutoValue;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;

import org.apache.beam.sdk.coders.*;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.state.*;
Expand All @@ -26,7 +24,7 @@
@AutoValue
public abstract class BatchLoadsWithResult<DestinationT>
extends PTransform<
PCollection<KV<DestinationT, TableRow>>, PCollection<KV<TableDestination, String>>> {
PCollection<KV<DestinationT, TableRow>>, PCollection<KV<TableDestination, String>>> {
static final Logger LOG = LoggerFactory.getLogger(BatchLoadsWithResult.class);

@VisibleForTesting
Expand Down Expand Up @@ -140,7 +138,8 @@ public PCollection<KV<TableDestination, String>> expand(

PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
input
.apply("WindowWithTrigger",
.apply(
"WindowWithTrigger",
Window.<KV<DestinationT, TableRow>>configure()
.triggering(
Repeatedly.forever(
Expand All @@ -161,8 +160,8 @@ public void process(ProcessContext c) {
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<>(
tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
new WriteGroupedRecordsToFiles<>(
tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
.withSideInputs(tempFilePrefixView))
.setCoder(WriteBundlesToFiles.ResultCoder.of(getDestinationCoder()));

Expand All @@ -176,8 +175,7 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
results
.apply(
Window.<WriteBundlesToFiles.Result<DestinationT>>configure()
.triggering(DefaultTrigger.of())
)
.triggering(DefaultTrigger.of()))
.apply("AttachSingletonKey", WithKeys.of((Void) null))
.setCoder(
KvCoder.of(
Expand All @@ -187,15 +185,15 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
.apply(
"WritePartitionTriggered",
ParDo.of(
new WritePartition<>(
false,
getDynamicDestinations(),
tempFilePrefixView,
DEFAULT_MAX_FILES_PER_PARTITION,
DEFAULT_MAX_BYTES_PER_PARTITION,
multiPartitionsTag,
singlePartitionTag,
getRowWriterFactory()))
new WritePartition<>(
false,
getDynamicDestinations(),
tempFilePrefixView,
DEFAULT_MAX_FILES_PER_PARTITION,
DEFAULT_MAX_BYTES_PER_PARTITION,
multiPartitionsTag,
singlePartitionTag,
getRowWriterFactory()))
.withSideInputs(tempFilePrefixView)
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));

Expand All @@ -210,6 +208,21 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
partitions.get(singlePartitionTag), loadJobIdPrefixView.apply(View.asSingleton()));
}

/**
* Generates one jobId per window only if any feature row was submitted in this window. We need to
* generate exactly one id per window, otherwise SingletonView will fail. To achieve this we have
* to use stateful DoFn that remembers that in current window id was generated by using {@link
* ValueState}. To prevent race condition in accessing {@link ValueState} we group all elements
* into single batch and ensure that processing will occur on single worker. Regular
* AfterWatermark trigger is used with early triggering on first element in pane, since we only
* interested in one first element and want to minimize other triggers. To keep memory footprint
* as low as possible we map all {@link feast.proto.types.FeatureRowProto.FeatureRow} to {@link
* Void} since we interested only in events (fact that there is some feature rows) and not the
* data.
*
* @param input feature Rows
* @return job id generated once per input's window
*/
private PCollection<String> createLoadJobIdPrefixView(
PCollection<KV<DestinationT, TableRow>> input) {
// We generate new JobId per each (input) window
Expand All @@ -218,48 +231,54 @@ private PCollection<String> createLoadJobIdPrefixView(
// So generated ids can be applied as side input
return input
.apply(
"EraseKey",
"EraseKeyAndValue",
ParDo.of(
new DoFn<KV<DestinationT, TableRow>, KV<Void, TableRow>>() {
new DoFn<KV<DestinationT, TableRow>, KV<Void, Void>>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(KV.of(null, c.element().getValue()));
// we don't need data, only fact of data existing
c.output(KV.of(null, null));
}
}))
.apply(
"TriggerOnFirstElement",
Window.<KV<Void, Void>>configure()
.triggering(
AfterWatermark.pastEndOfWindow()
// interested only in first element
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())))
.apply("CollectToOneWorker", GroupByKey.create())
.apply(
"CreateJobId",
ParDo.of(
new DoFn<KV<Void, TableRow>, String>() {
@StateId("oncePerWindow")
private final StateSpec<SetState<Boolean>> oncePerWindow = StateSpecs.set(BooleanCoder.of());
new DoFn<KV<Void, Iterable<Void>>, String>() {
@StateId("generatedForWindow")
private final StateSpec<ValueState<Boolean>> generatedForWindow =
StateSpecs.value(BooleanCoder.of());

@ProcessElement
public void process(
ProcessContext c,
BoundedWindow w,
@StateId("oncePerWindow") SetState<Boolean> oncePerWindow) {
@StateId("generatedForWindow") ValueState<Boolean> generatedForWindow) {

// if set already contains something
// it means we already generated Id for this window
Boolean empty = oncePerWindow.isEmpty().read();
if (empty != null && !empty) {
if (generatedForWindow.read() != null) {
return;
}

// trying to add to Set and check if it was added
// if true - we won and Id will be generated in current Process
Boolean insertResult = oncePerWindow.addIfAbsent(true).read();
if (insertResult != null && !insertResult) {
return;
}
generatedForWindow.write(true);

c.output(
String.format(
"beam_load_%s_%s",
c.getPipelineOptions().getJobName().replaceAll("-", ""),
BigQueryHelpers.randomUUIDString()));

LOG.info("Pane {}, start: {}, last: {}", c.pane().getIndex(), c.pane().isFirst(), c.pane().isLast());
LOG.info(
"Pane {}, start: {}, last: {}",
c.pane().getIndex(),
c.pane().isFirst(),
c.pane().isLast());
LOG.info("[BQ] New window {}, {}", c.timestamp(), w.maxTimestamp());
}
}));
Expand Down Expand Up @@ -321,5 +340,4 @@ PCollection<KV<TableDestination, String>> writeSinglePartitionWithResult(
true,
getSchemaUpdateOptions()));
}

}

0 comments on commit 88a2a0e

Please sign in to comment.