From addef179046148048b2ef8308371cab2f1924895 Mon Sep 17 00:00:00 2001 From: Lai Jiang Date: Wed, 19 Oct 2022 20:44:06 -0400 Subject: [PATCH] Does not self allocate IDs in Beam by default. (#1809) * Does not self allocate IDs in Beam by default. Per b/250948425, it is dangerous to implicitly allow all Beam pipelines to create buildables by self allocating the IDs. This change makes it so that one has to explicitly request self allocation in Beam. A boolean is added to the pipeline option so that it can be passed to the beam worker initializer that controls the behavior of the JVM on each worker. Note that we did not add the option in the metadata.json file because we did not want people to use the override at run time when launching a pipeline, due to the risk. As shown in RdePipeline.java, we instead explicitly hard-code the option in the pipeline. There is nothing that stops one to supply that option when launching the pipeline, but it's not advised. Tested=deployed the pipeline alpha and ran it. --- .../beam/common/RegistryPipelineOptions.java | 12 ++ .../RegistryPipelineWorkerInitializer.java | 16 ++- .../google/registry/beam/rde/RdePipeline.java | 14 +- .../java/google/registry/model/IdService.java | 120 +++++++++++++----- .../registry/testing/AppEngineExtension.java | 4 +- 5 files changed, 125 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java index bbd3e480639..74186e2e4c4 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java @@ -16,6 +16,7 @@ import google.registry.beam.common.RegistryJpaIO.Write; import google.registry.config.RegistryEnvironment; +import google.registry.model.annotations.DeleteAfterMigration; import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import java.util.Objects; @@ -65,6 +66,17 @@ public interface RegistryPipelineOptions extends GcpOptions { void setSqlWriteShards(int maxConcurrentSqlWriters); + @DeleteAfterMigration + @Description( + "Whether to use self allocated primary IDs when building entities. This should only be used" + + " when the IDs are not significant and the resulting entities are not persisted back to" + + " the database. Use with caution as self allocated IDs are not unique across workers," + + " and persisting entities with these IDs can be dangerous.") + @Default.Boolean(false) + boolean getUseSelfAllocatedId(); + + void setUseSelfAllocatedId(boolean useSelfAllocatedId); + static RegistryPipelineComponent toRegistryPipelineComponent(RegistryPipelineOptions options) { return DaggerRegistryPipelineComponent.builder() .isolationOverride(options.getIsolationOverride()) diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java index f4d13e9039d..ea6899b68c8 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java @@ -22,6 +22,8 @@ import google.registry.config.RegistryEnvironment; import google.registry.config.SystemPropertySetter; import google.registry.model.AppEngineEnvironment; +import google.registry.model.IdService; +import google.registry.model.IdService.SelfAllocatedIdSupplier; import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.TransactionManagerFactory; import org.apache.beam.sdk.harness.JvmInitializer; @@ -65,12 +67,20 @@ public void beforeProcessing(PipelineOptions options) { transactionManagerLazy = registryPipelineComponent.getJpaTransactionManager(); } TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get); - // Masquerade all threads as App Engine threads so we can create Ofy keys in the pipeline. Also + // Masquerade all threads as App Engine threads, so we can create Ofy keys in the pipeline. Also // loads all ofy entities. new AppEngineEnvironment("s~" + registryPipelineComponent.getProjectId()) .setEnvironmentForAllThreads(); - // Set the system property so that we can call IdService.allocateId() without access to - // datastore. SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true"); + // Use self-allocated IDs if requested. Note that this inevitably results in duplicate IDs from + // multiple workers, which can also collide with existing IDs in the database. So they cannot be + // dependent upon for comparison or anything significant. The resulting entities can never be + // persisted back into the database. This is a stop-gap measure that should only be used when + // you need to create Buildables in Beam, but do not have control over how the IDs are + // allocated, and you don't care about the generated IDs as long + // as you can build the entities. + if (registryOptions.getUseSelfAllocatedId()) { + IdService.setIdSupplier(SelfAllocatedIdSupplier.getInstance()); + } } } diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java index 0bd4088c4a7..07a4b06a69a 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipeline.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java @@ -128,7 +128,7 @@ *

{@link EppResource}

* * All EPP resources are loaded from the corresponding {@link HistoryEntry}, which has the resource - * embedded. In general we find most recent history entry before watermark and filter out the ones + * embedded. In general, we find most recent history entry before watermark and filter out the ones * that are soft-deleted by watermark. The history is emitted as pairs of (resource repo ID: history * revision ID) from the SQL query. * @@ -164,7 +164,7 @@ * * The (pending deposit: deposit fragment) pairs from different resources are combined and grouped * by pending deposit. For each pending deposit, all the relevant deposit fragments are written into - * a encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there + * an encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there * is no need to lock the GCS write operation to prevent stomping. The cursor for staging the * pending deposit is then rolled forward, and the next action is enqueued. The latter two * operations are performed in a transaction so the cursor is rolled back if enqueueing failed. @@ -698,8 +698,8 @@ static ImmutableSet decodePendingDeposits(String encodedPendingD } /** - * Encodes the pending deposit set in an URL safe string that is sent to the pipeline worker by - * the pipeline launcher as a pipeline option. + * Encodes the pending deposit set in a URL safe string that is sent to the pipeline worker by the + * pipeline launcher as a pipeline option. */ public static String encodePendingDeposits(ImmutableSet pendingDeposits) throws IOException { @@ -715,6 +715,12 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio PipelineOptionsFactory.register(RdePipelineOptions.class); RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class); + // We need to self allocate the IDs because the pipeline creates EPP resources from history + // entries and projects them to watermark. These buildable entities would otherwise request an + // ID from datastore, which Beam does not have access to. The IDs are not included in the + // deposits or are these entities persisted back to the database, so it is OK to use a self + // allocated ID to get around the limitations of beam. + options.setUseSelfAllocatedId(true); RegistryPipelineOptions.validateRegistryPipelineOptions(options); options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run(); diff --git a/core/src/main/java/google/registry/model/IdService.java b/core/src/main/java/google/registry/model/IdService.java index 8074c0583e6..0a6040c0b80 100644 --- a/core/src/main/java/google/registry/model/IdService.java +++ b/core/src/main/java/google/registry/model/IdService.java @@ -17,58 +17,114 @@ import static com.google.common.base.Preconditions.checkState; import com.google.appengine.api.datastore.DatastoreServiceFactory; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.flogger.FluentLogger; import google.registry.beam.common.RegistryPipelineWorkerInitializer; import google.registry.config.RegistryEnvironment; import google.registry.model.annotations.DeleteAfterMigration; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; /** - * Allocates a globally unique {@link Long} number to use as an Ofy {@code @Id}. + * Allocates a {@link long} to use as a {@code @Id}, (part) of the primary SQL key for an entity. * - *

In non-test, non-beam environments the Id is generated by Datastore, otherwise it's from an - * atomic long number that's incremented every time this method is called. + *

Normally, the ID is globally unique and allocated by Datastore. It is possible to override + * this behavior by providing an ID supplier, such as in unit tests, where a self-allocated ID based + * on a monotonically increasing atomic {@link long} is used. Such an ID supplier can also be used + * in other scenarios, such as in a Beam pipeline to get around the limitation of Beam's inability + * to use GAE SDK to access Datastore. The override should be used with great care lest it results + * in irreversible data corruption. + * + * @see #setIdSupplier(Supplier) */ @DeleteAfterMigration public final class IdService { - /** - * A placeholder String passed into DatastoreService.allocateIds that ensures that all ids are - * initialized from the same id pool. - */ - private static final String APP_WIDE_ALLOCATION_KIND = "common"; + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private IdService() {} + + private static Supplier idSupplier = + RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get()) + ? SelfAllocatedIdSupplier.getInstance() + : DatastoreIdSupplier.getInstance(); /** - * Counts of used ids for use in unit tests or Beam. + * Provides a {@link Supplier} of ID that overrides the default. + * + *

Currently, the only use case for an override is in the Beam pipeline, where access to + * Datastore is not possible through the App Engine API. As such, the setter explicitly checks if + * the runtime is Beam. * - *

Note that one should only use self-allocate Ids in Beam for entities whose Ids are not - * important and are not persisted back to the database, i. e. nowhere the uniqueness of the ID is - * required. + *

Because the provided supplier is not guaranteed to be globally unique and compatible with + * existing IDs in the database, one should proceed with great care. It is safe to use an + * arbitrary supplier when the resulting IDs are not significant and not persisted back to the + * database, i.e. the IDs are only required by the {@link Buildable} contract but are not used in + * any meaningful way. One example is the RDE pipeline where we project EPP resource entities from + * history entries to watermark time, which are then marshalled into XML elements in the RDE + * deposits, where the IDs are omitted. */ - private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero - - private static final boolean isSelfAllocated() { - return RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get()) - || "true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false")); + public static void setIdSupplier(Supplier idSupplier) { + checkState( + "true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false")), + "Can only set ID supplier in a Beam pipeline"); + logger.atWarning().log("Using ID supplier override!"); + IdService.idSupplier = idSupplier; } /** Allocates an id. */ - // TODO(b/201547855): Find a way to allocate a unique ID without datastore. public static long allocateId() { - return isSelfAllocated() - ? nextSelfAllocatedId.getAndIncrement() - : DatastoreServiceFactory.getDatastoreService() - .allocateIds(APP_WIDE_ALLOCATION_KIND, 1) - .iterator() - .next() - .getId(); + return idSupplier.get(); } - /** Resets the global self-allocated id counter (i.e. sets the next id to 1). */ - @VisibleForTesting - public static void resetSelfAllocatedId() { - checkState( - isSelfAllocated(), "Can only call resetSelfAllocatedId() in unit tests or Beam pipelines"); - nextSelfAllocatedId.set(1); // ids cannot be zero + // TODO(b/201547855): Find a way to allocate a unique ID without datastore. + private static class DatastoreIdSupplier implements Supplier { + + private static final DatastoreIdSupplier INSTANCE = new DatastoreIdSupplier(); + + /** + * A placeholder String passed into {@code DatastoreService.allocateIds} that ensures that all + * IDs are initialized from the same ID pool. + */ + private static final String APP_WIDE_ALLOCATION_KIND = "common"; + + public static DatastoreIdSupplier getInstance() { + return INSTANCE; + } + + @Override + public Long get() { + return DatastoreServiceFactory.getDatastoreService() + .allocateIds(APP_WIDE_ALLOCATION_KIND, 1) + .iterator() + .next() + .getId(); + } + } + + /** + * An ID supplier that allocates an ID from a monotonically increasing atomic {@link long}. + * + *

The generated IDs are only unique within the same JVM. It is not suitable for production use + * unless in cases the IDs are not significant. + */ + public static class SelfAllocatedIdSupplier implements Supplier { + + private static final SelfAllocatedIdSupplier INSTANCE = new SelfAllocatedIdSupplier(); + + /** Counts of used ids for self allocating IDs. */ + private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero + + public static SelfAllocatedIdSupplier getInstance() { + return INSTANCE; + } + + @Override + public Long get() { + return nextSelfAllocatedId.getAndIncrement(); + } + + public void reset() { + nextSelfAllocatedId.set(1); + } } } diff --git a/core/src/test/java/google/registry/testing/AppEngineExtension.java b/core/src/test/java/google/registry/testing/AppEngineExtension.java index 0cf37bef697..090d98eb33d 100644 --- a/core/src/test/java/google/registry/testing/AppEngineExtension.java +++ b/core/src/test/java/google/registry/testing/AppEngineExtension.java @@ -40,7 +40,7 @@ import com.google.common.io.Files; import com.googlecode.objectify.Key; import com.googlecode.objectify.ObjectifyFilter; -import google.registry.model.IdService; +import google.registry.model.IdService.SelfAllocatedIdSupplier; import google.registry.model.ofy.ObjectifyService; import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar.State; @@ -441,7 +441,7 @@ public void setUp() throws Exception { ObjectifyService.initOfy(); // Reset id allocation in ObjectifyService so that ids are deterministic in tests. - IdService.resetSelfAllocatedId(); + SelfAllocatedIdSupplier.getInstance().reset(); this.ofyTestEntities.forEach(AppEngineExtension::register); }