Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Donot self-allocate IDs in Beam by default. #1809

Merged
merged 4 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import google.registry.beam.common.RegistryJpaIO.Write;
import google.registry.config.RegistryEnvironment;
import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import java.util.Objects;
Expand Down Expand Up @@ -65,6 +66,17 @@ public interface RegistryPipelineOptions extends GcpOptions {

void setSqlWriteShards(int maxConcurrentSqlWriters);

@DeleteAfterMigration
@Description(
"Whether to use self allocated primary IDs when building entities. This should only be used"
+ " when the IDs are not significant and the resulting entities are not persisted back to"
+ " the database. Use with caution as self allocated IDs are not unique across workers,"
+ " and persisting entities with these IDs can be dangerous.")
@Default.Boolean(false)
boolean getUseSelfAllocatedId();

void setUseSelfAllocatedId(boolean useSelfAllocatedId);

static RegistryPipelineComponent toRegistryPipelineComponent(RegistryPipelineOptions options) {
return DaggerRegistryPipelineComponent.builder()
.isolationOverride(options.getIsolationOverride())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import google.registry.config.RegistryEnvironment;
import google.registry.config.SystemPropertySetter;
import google.registry.model.AppEngineEnvironment;
import google.registry.model.IdService;
import google.registry.model.IdService.SelfAllocatedIdSupplier;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManagerFactory;
import org.apache.beam.sdk.harness.JvmInitializer;
Expand Down Expand Up @@ -65,12 +67,20 @@ public void beforeProcessing(PipelineOptions options) {
transactionManagerLazy = registryPipelineComponent.getJpaTransactionManager();
}
TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get);
// Masquerade all threads as App Engine threads so we can create Ofy keys in the pipeline. Also
// Masquerade all threads as App Engine threads, so we can create Ofy keys in the pipeline. Also
// loads all ofy entities.
new AppEngineEnvironment("s~" + registryPipelineComponent.getProjectId())
.setEnvironmentForAllThreads();
// Set the system property so that we can call IdService.allocateId() without access to
// datastore.
SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true");
// Use self-allocated IDs if requested. Note that this inevitably results in duplicate IDs from
// multiple workers, which can also collide with existing IDs in the database. So they cannot be
// dependent upon for comparison or anything significant. The resulting entities can never be
// persisted back into the database. This is a stop-gap measure that should only be used when
// you need to create Buildables in Beam, but do not have control over how the IDs are
// allocated, and you don't care about the generated IDs as long
// as you can build the entities.
if (registryOptions.getUseSelfAllocatedId()) {
IdService.setIdSupplier(SelfAllocatedIdSupplier.getInstance());
}
}
}
14 changes: 10 additions & 4 deletions core/src/main/java/google/registry/beam/rde/RdePipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
* <h2>{@link EppResource}</h2>
*
* All EPP resources are loaded from the corresponding {@link HistoryEntry}, which has the resource
* embedded. In general we find most recent history entry before watermark and filter out the ones
* embedded. In general, we find most recent history entry before watermark and filter out the ones
* that are soft-deleted by watermark. The history is emitted as pairs of (resource repo ID: history
* revision ID) from the SQL query.
*
Expand Down Expand Up @@ -164,7 +164,7 @@
*
* The (pending deposit: deposit fragment) pairs from different resources are combined and grouped
* by pending deposit. For each pending deposit, all the relevant deposit fragments are written into
* a encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there
* an encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there
* is no need to lock the GCS write operation to prevent stomping. The cursor for staging the
* pending deposit is then rolled forward, and the next action is enqueued. The latter two
* operations are performed in a transaction so the cursor is rolled back if enqueueing failed.
Expand Down Expand Up @@ -698,8 +698,8 @@ static ImmutableSet<PendingDeposit> decodePendingDeposits(String encodedPendingD
}

/**
* Encodes the pending deposit set in an URL safe string that is sent to the pipeline worker by
* the pipeline launcher as a pipeline option.
* Encodes the pending deposit set in a URL safe string that is sent to the pipeline worker by the
* pipeline launcher as a pipeline option.
*/
public static String encodePendingDeposits(ImmutableSet<PendingDeposit> pendingDeposits)
throws IOException {
Expand All @@ -715,6 +715,12 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio
PipelineOptionsFactory.register(RdePipelineOptions.class);
RdePipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class);
// We need to self allocate the IDs because the pipeline creates EPP resources from history
// entries and projects them to watermark. These buildable entities would otherwise request an
// ID from datastore, which Beam does not have access to. The IDs are not included in the
// deposits or are these entities persisted back to the database, so it is OK to use a self
// allocated ID to get around the limitations of beam.
options.setUseSelfAllocatedId(true);
RegistryPipelineOptions.validateRegistryPipelineOptions(options);
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run();
Expand Down
120 changes: 88 additions & 32 deletions core/src/main/java/google/registry/model/IdService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,114 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.FluentLogger;
import google.registry.beam.common.RegistryPipelineWorkerInitializer;
import google.registry.config.RegistryEnvironment;
import google.registry.model.annotations.DeleteAfterMigration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

/**
* Allocates a globally unique {@link Long} number to use as an Ofy {@code @Id}.
* Allocates a {@link long} to use as a {@code @Id}, (part) of the primary SQL key for an entity.
*
* <p>In non-test, non-beam environments the Id is generated by Datastore, otherwise it's from an
* atomic long number that's incremented every time this method is called.
* <p>Normally, the ID is globally unique and allocated by Datastore. It is possible to override
* this behavior by providing an ID supplier, such as in unit tests, where a self-allocated ID based
* on a monotonically increasing atomic {@link long} is used. Such an ID supplier can also be used
* in other scenarios, such as in a Beam pipeline to get around the limitation of Beam's inability
* to use GAE SDK to access Datastore. The override should be used with great care lest it results
* in irreversible data corruption.
*
* @see #setIdSupplier(Supplier)
*/
@DeleteAfterMigration
public final class IdService {

/**
* A placeholder String passed into DatastoreService.allocateIds that ensures that all ids are
* initialized from the same id pool.
*/
private static final String APP_WIDE_ALLOCATION_KIND = "common";
private static final FluentLogger logger = FluentLogger.forEnclosingClass();

private IdService() {}

private static Supplier<Long> idSupplier =
RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())
? SelfAllocatedIdSupplier.getInstance()
: DatastoreIdSupplier.getInstance();

/**
* Counts of used ids for use in unit tests or Beam.
* Provides a {@link Supplier} of ID that overrides the default.
*
* <p>Currently, the only use case for an override is in the Beam pipeline, where access to
* Datastore is not possible through the App Engine API. As such, the setter explicitly checks if
* the runtime is Beam.
*
* <p>Note that one should only use self-allocate Ids in Beam for entities whose Ids are not
* important and are not persisted back to the database, i. e. nowhere the uniqueness of the ID is
* required.
* <p>Because the provided supplier is not guaranteed to be globally unique and compatible with
* existing IDs in the database, one should proceed with great care. It is safe to use an
* arbitrary supplier when the resulting IDs are not significant and not persisted back to the
* database, i.e. the IDs are only required by the {@link Buildable} contract but are not used in
* any meaningful way. One example is the RDE pipeline where we project EPP resource entities from
* history entries to watermark time, which are then marshalled into XML elements in the RDE
* deposits, where the IDs are omitted.
*/
private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero

private static final boolean isSelfAllocated() {
return RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())
|| "true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false"));
public static void setIdSupplier(Supplier<Long> idSupplier) {
checkState(
"true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false")),
"Can only set ID supplier in a Beam pipeline");
logger.atWarning().log("Using ID supplier override!");
IdService.idSupplier = idSupplier;
}

/** Allocates an id. */
// TODO(b/201547855): Find a way to allocate a unique ID without datastore.
public static long allocateId() {
return isSelfAllocated()
? nextSelfAllocatedId.getAndIncrement()
: DatastoreServiceFactory.getDatastoreService()
.allocateIds(APP_WIDE_ALLOCATION_KIND, 1)
.iterator()
.next()
.getId();
return idSupplier.get();
}

/** Resets the global self-allocated id counter (i.e. sets the next id to 1). */
@VisibleForTesting
public static void resetSelfAllocatedId() {
checkState(
isSelfAllocated(), "Can only call resetSelfAllocatedId() in unit tests or Beam pipelines");
nextSelfAllocatedId.set(1); // ids cannot be zero
// TODO(b/201547855): Find a way to allocate a unique ID without datastore.
private static class DatastoreIdSupplier implements Supplier<Long> {

private static final DatastoreIdSupplier INSTANCE = new DatastoreIdSupplier();

/**
* A placeholder String passed into {@code DatastoreService.allocateIds} that ensures that all
* IDs are initialized from the same ID pool.
*/
private static final String APP_WIDE_ALLOCATION_KIND = "common";

public static DatastoreIdSupplier getInstance() {
return INSTANCE;
}

@Override
public Long get() {
return DatastoreServiceFactory.getDatastoreService()
.allocateIds(APP_WIDE_ALLOCATION_KIND, 1)
.iterator()
.next()
.getId();
}
}

/**
* An ID supplier that allocates an ID from a monotonically increasing atomic {@link long}.
*
* <p>The generated IDs are only unique within the same JVM. It is not suitable for production use
* unless in cases the IDs are not significant.
*/
public static class SelfAllocatedIdSupplier implements Supplier<Long> {

private static final SelfAllocatedIdSupplier INSTANCE = new SelfAllocatedIdSupplier();

/** Counts of used ids for self allocating IDs. */
private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero

public static SelfAllocatedIdSupplier getInstance() {
return INSTANCE;
}

@Override
public Long get() {
return nextSelfAllocatedId.getAndIncrement();
}

public void reset() {
nextSelfAllocatedId.set(1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.google.common.io.Files;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.ObjectifyFilter;
import google.registry.model.IdService;
import google.registry.model.IdService.SelfAllocatedIdSupplier;
import google.registry.model.ofy.ObjectifyService;
import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.Registrar.State;
Expand Down Expand Up @@ -441,7 +441,7 @@ public void setUp() throws Exception {

ObjectifyService.initOfy();
// Reset id allocation in ObjectifyService so that ids are deterministic in tests.
IdService.resetSelfAllocatedId();
SelfAllocatedIdSupplier.getInstance().reset();
this.ofyTestEntities.forEach(AppEngineExtension::register);
}

Expand Down