diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java index bbd3e480639..74186e2e4c4 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java @@ -16,6 +16,7 @@ import google.registry.beam.common.RegistryJpaIO.Write; import google.registry.config.RegistryEnvironment; +import google.registry.model.annotations.DeleteAfterMigration; import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import java.util.Objects; @@ -65,6 +66,17 @@ public interface RegistryPipelineOptions extends GcpOptions { void setSqlWriteShards(int maxConcurrentSqlWriters); + @DeleteAfterMigration + @Description( + "Whether to use self allocated primary IDs when building entities. This should only be used" + + " when the IDs are not significant and the resulting entities are not persisted back to" + + " the database. Use with caution as self allocated IDs are not unique across workers," + + " and persisting entities with these IDs can be dangerous.") + @Default.Boolean(false) + boolean getUseSelfAllocatedId(); + + void setUseSelfAllocatedId(boolean useSelfAllocatedId); + static RegistryPipelineComponent toRegistryPipelineComponent(RegistryPipelineOptions options) { return DaggerRegistryPipelineComponent.builder() .isolationOverride(options.getIsolationOverride()) diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java index f4d13e9039d..ea6899b68c8 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java @@ -22,6 +22,8 @@ import google.registry.config.RegistryEnvironment; import google.registry.config.SystemPropertySetter; import google.registry.model.AppEngineEnvironment; +import google.registry.model.IdService; +import google.registry.model.IdService.SelfAllocatedIdSupplier; import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.TransactionManagerFactory; import org.apache.beam.sdk.harness.JvmInitializer; @@ -65,12 +67,20 @@ public void beforeProcessing(PipelineOptions options) { transactionManagerLazy = registryPipelineComponent.getJpaTransactionManager(); } TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get); - // Masquerade all threads as App Engine threads so we can create Ofy keys in the pipeline. Also + // Masquerade all threads as App Engine threads, so we can create Ofy keys in the pipeline. Also // loads all ofy entities. new AppEngineEnvironment("s~" + registryPipelineComponent.getProjectId()) .setEnvironmentForAllThreads(); - // Set the system property so that we can call IdService.allocateId() without access to - // datastore. SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true"); + // Use self-allocated IDs if requested. Note that this inevitably results in duplicate IDs from + // multiple workers, which can also collide with existing IDs in the database. So they cannot be + // dependent upon for comparison or anything significant. The resulting entities can never be + // persisted back into the database. This is a stop-gap measure that should only be used when + // you need to create Buildables in Beam, but do not have control over how the IDs are + // allocated, and you don't care about the generated IDs as long + // as you can build the entities. + if (registryOptions.getUseSelfAllocatedId()) { + IdService.setIdSupplier(SelfAllocatedIdSupplier.getInstance()); + } } } diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java index 0bd4088c4a7..07a4b06a69a 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipeline.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java @@ -128,7 +128,7 @@ *

{@link EppResource}

* * All EPP resources are loaded from the corresponding {@link HistoryEntry}, which has the resource - * embedded. In general we find most recent history entry before watermark and filter out the ones + * embedded. In general, we find most recent history entry before watermark and filter out the ones * that are soft-deleted by watermark. The history is emitted as pairs of (resource repo ID: history * revision ID) from the SQL query. * @@ -164,7 +164,7 @@ * * The (pending deposit: deposit fragment) pairs from different resources are combined and grouped * by pending deposit. For each pending deposit, all the relevant deposit fragments are written into - * a encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there + * an encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there * is no need to lock the GCS write operation to prevent stomping. The cursor for staging the * pending deposit is then rolled forward, and the next action is enqueued. The latter two * operations are performed in a transaction so the cursor is rolled back if enqueueing failed. @@ -698,8 +698,8 @@ static ImmutableSet decodePendingDeposits(String encodedPendingD } /** - * Encodes the pending deposit set in an URL safe string that is sent to the pipeline worker by - * the pipeline launcher as a pipeline option. + * Encodes the pending deposit set in a URL safe string that is sent to the pipeline worker by the + * pipeline launcher as a pipeline option. */ public static String encodePendingDeposits(ImmutableSet pendingDeposits) throws IOException { @@ -715,6 +715,12 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio PipelineOptionsFactory.register(RdePipelineOptions.class); RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class); + // We need to self allocate the IDs because the pipeline creates EPP resources from history + // entries and projects them to watermark. These buildable entities would otherwise request an + // ID from datastore, which Beam does not have access to. The IDs are not included in the + // deposits or are these entities persisted back to the database, so it is OK to use a self + // allocated ID to get around the limitations of beam. + options.setUseSelfAllocatedId(true); RegistryPipelineOptions.validateRegistryPipelineOptions(options); options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run(); diff --git a/core/src/main/java/google/registry/model/IdService.java b/core/src/main/java/google/registry/model/IdService.java index 8074c0583e6..0a6040c0b80 100644 --- a/core/src/main/java/google/registry/model/IdService.java +++ b/core/src/main/java/google/registry/model/IdService.java @@ -17,58 +17,114 @@ import static com.google.common.base.Preconditions.checkState; import com.google.appengine.api.datastore.DatastoreServiceFactory; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.flogger.FluentLogger; import google.registry.beam.common.RegistryPipelineWorkerInitializer; import google.registry.config.RegistryEnvironment; import google.registry.model.annotations.DeleteAfterMigration; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; /** - * Allocates a globally unique {@link Long} number to use as an Ofy {@code @Id}. + * Allocates a {@link long} to use as a {@code @Id}, (part) of the primary SQL key for an entity. * - *

In non-test, non-beam environments the Id is generated by Datastore, otherwise it's from an - * atomic long number that's incremented every time this method is called. + *

Normally, the ID is globally unique and allocated by Datastore. It is possible to override + * this behavior by providing an ID supplier, such as in unit tests, where a self-allocated ID based + * on a monotonically increasing atomic {@link long} is used. Such an ID supplier can also be used + * in other scenarios, such as in a Beam pipeline to get around the limitation of Beam's inability + * to use GAE SDK to access Datastore. The override should be used with great care lest it results + * in irreversible data corruption. + * + * @see #setIdSupplier(Supplier) */ @DeleteAfterMigration public final class IdService { - /** - * A placeholder String passed into DatastoreService.allocateIds that ensures that all ids are - * initialized from the same id pool. - */ - private static final String APP_WIDE_ALLOCATION_KIND = "common"; + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private IdService() {} + + private static Supplier idSupplier = + RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get()) + ? SelfAllocatedIdSupplier.getInstance() + : DatastoreIdSupplier.getInstance(); /** - * Counts of used ids for use in unit tests or Beam. + * Provides a {@link Supplier} of ID that overrides the default. + * + *

Currently, the only use case for an override is in the Beam pipeline, where access to + * Datastore is not possible through the App Engine API. As such, the setter explicitly checks if + * the runtime is Beam. * - *

Note that one should only use self-allocate Ids in Beam for entities whose Ids are not - * important and are not persisted back to the database, i. e. nowhere the uniqueness of the ID is - * required. + *

Because the provided supplier is not guaranteed to be globally unique and compatible with + * existing IDs in the database, one should proceed with great care. It is safe to use an + * arbitrary supplier when the resulting IDs are not significant and not persisted back to the + * database, i.e. the IDs are only required by the {@link Buildable} contract but are not used in + * any meaningful way. One example is the RDE pipeline where we project EPP resource entities from + * history entries to watermark time, which are then marshalled into XML elements in the RDE + * deposits, where the IDs are omitted. */ - private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero - - private static final boolean isSelfAllocated() { - return RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get()) - || "true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false")); + public static void setIdSupplier(Supplier idSupplier) { + checkState( + "true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false")), + "Can only set ID supplier in a Beam pipeline"); + logger.atWarning().log("Using ID supplier override!"); + IdService.idSupplier = idSupplier; } /** Allocates an id. */ - // TODO(b/201547855): Find a way to allocate a unique ID without datastore. public static long allocateId() { - return isSelfAllocated() - ? nextSelfAllocatedId.getAndIncrement() - : DatastoreServiceFactory.getDatastoreService() - .allocateIds(APP_WIDE_ALLOCATION_KIND, 1) - .iterator() - .next() - .getId(); + return idSupplier.get(); } - /** Resets the global self-allocated id counter (i.e. sets the next id to 1). */ - @VisibleForTesting - public static void resetSelfAllocatedId() { - checkState( - isSelfAllocated(), "Can only call resetSelfAllocatedId() in unit tests or Beam pipelines"); - nextSelfAllocatedId.set(1); // ids cannot be zero + // TODO(b/201547855): Find a way to allocate a unique ID without datastore. + private static class DatastoreIdSupplier implements Supplier { + + private static final DatastoreIdSupplier INSTANCE = new DatastoreIdSupplier(); + + /** + * A placeholder String passed into {@code DatastoreService.allocateIds} that ensures that all + * IDs are initialized from the same ID pool. + */ + private static final String APP_WIDE_ALLOCATION_KIND = "common"; + + public static DatastoreIdSupplier getInstance() { + return INSTANCE; + } + + @Override + public Long get() { + return DatastoreServiceFactory.getDatastoreService() + .allocateIds(APP_WIDE_ALLOCATION_KIND, 1) + .iterator() + .next() + .getId(); + } + } + + /** + * An ID supplier that allocates an ID from a monotonically increasing atomic {@link long}. + * + *

The generated IDs are only unique within the same JVM. It is not suitable for production use + * unless in cases the IDs are not significant. + */ + public static class SelfAllocatedIdSupplier implements Supplier { + + private static final SelfAllocatedIdSupplier INSTANCE = new SelfAllocatedIdSupplier(); + + /** Counts of used ids for self allocating IDs. */ + private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero + + public static SelfAllocatedIdSupplier getInstance() { + return INSTANCE; + } + + @Override + public Long get() { + return nextSelfAllocatedId.getAndIncrement(); + } + + public void reset() { + nextSelfAllocatedId.set(1); + } } } diff --git a/core/src/test/java/google/registry/testing/AppEngineExtension.java b/core/src/test/java/google/registry/testing/AppEngineExtension.java index 0cf37bef697..090d98eb33d 100644 --- a/core/src/test/java/google/registry/testing/AppEngineExtension.java +++ b/core/src/test/java/google/registry/testing/AppEngineExtension.java @@ -40,7 +40,7 @@ import com.google.common.io.Files; import com.googlecode.objectify.Key; import com.googlecode.objectify.ObjectifyFilter; -import google.registry.model.IdService; +import google.registry.model.IdService.SelfAllocatedIdSupplier; import google.registry.model.ofy.ObjectifyService; import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar.State; @@ -441,7 +441,7 @@ public void setUp() throws Exception { ObjectifyService.initOfy(); // Reset id allocation in ObjectifyService so that ids are deterministic in tests. - IdService.resetSelfAllocatedId(); + SelfAllocatedIdSupplier.getInstance().reset(); this.ofyTestEntities.forEach(AppEngineExtension::register); }