diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 2bf73b34247b3..3d8da7eac7690 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -78,6 +78,7 @@ import org.opensearch.index.mapper.SourceToParse; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.TestTranslog; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogStats; @@ -675,6 +676,7 @@ public static final IndexShard newIndexShard( () -> {}, RetentionLeaseSyncer.EMPTY, cbs, + new InternalTranslogFactory(), SegmentReplicationCheckpointPublisher.EMPTY, null ); diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 210df9d342cb7..38065bf5aed20 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -88,6 +88,7 @@ import org.opensearch.index.shard.ShardPath; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.Store; +import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.Translog; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.cluster.IndicesClusterStateService; @@ -547,6 +548,8 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, + // TODO Replace with remote translog factory in the follow up PR + this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(), this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null, remoteStore ); diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 4ae6646ed14f0..ba30103f70269 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -51,8 +51,10 @@ import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; +import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; +import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.IndexingMemoryController; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.threadpool.ThreadPool; @@ -150,6 +152,8 @@ public Supplier retentionLeasesSupplier() { private final TranslogConfig translogConfig; + private final TranslogFactory translogFactory; + public EngineConfig( ShardId shardId, ThreadPool threadPool, @@ -253,7 +257,8 @@ public EngineConfig( retentionLeasesSupplier, primaryTermSupplier, tombstoneDocSupplier, - false + false, + new InternalTranslogFactory() ); } @@ -284,7 +289,8 @@ public EngineConfig( Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier, - boolean isReadOnlyReplica + boolean isReadOnlyReplica, + TranslogFactory translogFactory ) { if (isReadOnlyReplica && indexSettings.isSegRepEnabled() == false) { throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled"); @@ -328,6 +334,7 @@ public EngineConfig( this.primaryTermSupplier = primaryTermSupplier; this.tombstoneDocSupplier = tombstoneDocSupplier; this.isReadOnlyReplica = isReadOnlyReplica; + this.translogFactory = translogFactory; } /** @@ -532,6 +539,14 @@ public boolean isReadOnlyReplica() { return indexSettings.isSegRepEnabled() && isReadOnlyReplica; } + /** + * Returns the underlying translog factory + * @return the translog factory + */ + public TranslogFactory getTranslogFactory() { + return translogFactory; + } + /** * A supplier supplies tombstone documents which will be used in soft-update methods. * The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded. diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java index c8aec3570f8b5..f0db086e47816 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java @@ -28,6 +28,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; +import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.PluginsService; @@ -147,7 +148,8 @@ public EngineConfig newEngineConfig( Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, EngineConfig.TombstoneDocSupplier tombstoneDocSupplier, - boolean isReadOnlyReplica + boolean isReadOnlyReplica, + TranslogFactory translogFactory ) { CodecService codecServiceToUse = codecService; if (codecService == null && this.codecServiceFactory != null) { @@ -178,7 +180,8 @@ public EngineConfig newEngineConfig( retentionLeasesSupplier, primaryTermSupplier, tombstoneDocSupplier, - isReadOnlyReplica + isReadOnlyReplica, + translogFactory ); } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 7d90c2ad653be..16599141b1345 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -289,7 +289,8 @@ public void onFailure(String reason, Exception ex) { () -> getLocalCheckpointTracker(), translogUUID, new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId), - this::ensureOpen + this::ensureOpen, + engineConfig.getTranslogFactory() ); this.translogManager = translogManagerRef; this.softDeletesPolicy = newSoftDeletesPolicy(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 1f9306cf51e1e..a481203ba3dea 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -101,7 +101,8 @@ public void onAfterTranslogSync() { } } }, - this + this, + engineConfig.getTranslogFactory() ); this.translogManager = translogManagerRef; } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index 4bc37084675ea..f6c5bf7640a73 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -195,14 +195,15 @@ public void trimUnreferencedTranslogFiles() throws TranslogException { final TranslogDeletionPolicy translogDeletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0); translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); try ( - Translog translog = new Translog( - translogConfig, - translogUuid, - translogDeletionPolicy, - engineConfig.getGlobalCheckpointSupplier(), - engineConfig.getPrimaryTermSupplier(), - seqNo -> {} - ) + Translog translog = engineConfig.getTranslogFactory() + .newTranslog( + translogConfig, + translogUuid, + translogDeletionPolicy, + engineConfig.getGlobalCheckpointSupplier(), + engineConfig.getPrimaryTermSupplier(), + seqNo -> {} + ) ) { translog.trimUnreferencedReaders(); // refresh the translog stats diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index cebe262fee5d1..f426768119c1d 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -258,14 +258,15 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); try ( - Translog translog = new Translog( - translogConfig, - translogUuid, - translogDeletionPolicy, - config.getGlobalCheckpointSupplier(), - config.getPrimaryTermSupplier(), - seqNo -> {} - ) + Translog translog = config.getTranslogFactory() + .newTranslog( + translogConfig, + translogUuid, + translogDeletionPolicy, + config.getGlobalCheckpointSupplier(), + config.getPrimaryTermSupplier(), + seqNo -> {} + ) ) { return translog.stats(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ac259f9430a3d..8970e0734f184 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -151,6 +151,7 @@ import org.opensearch.index.store.StoreStats; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; +import org.opensearch.index.translog.TranslogFactory; import org.opensearch.index.translog.TranslogRecoveryRunner; import org.opensearch.index.translog.TranslogStats; import org.opensearch.index.warmer.ShardIndexWarmerService; @@ -308,6 +309,7 @@ Runnable getGlobalCheckpointSyncer() { private final ReferenceManager.RefreshListener checkpointRefreshListener; private final Store remoteStore; + private final TranslogFactory translogFactory; public IndexShard( final ShardRouting shardRouting, @@ -330,6 +332,7 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, + final TranslogFactory translogFactory, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, @Nullable final Store remoteStore ) throws IOException { @@ -420,6 +423,7 @@ public boolean shouldCache(Query query) { this.checkpointRefreshListener = null; } this.remoteStore = remoteStore; + this.translogFactory = translogFactory; } public ThreadPool getThreadPool() { @@ -3254,7 +3258,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro replicationTracker::getRetentionLeases, () -> getOperationPrimaryTerm(), tombstoneDocSupplier(), - indexSettings.isSegRepEnabled() && shardRouting.primary() == false + indexSettings.isSegRepEnabled() && shardRouting.primary() == false, + translogFactory ); } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java new file mode 100644 index 0000000000000..566eda4fe4a6e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import java.io.IOException; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; + +/** + * Translog Factory for the local on-disk {@link Translog} + * + * @opensearch.internal + */ +public class InternalTranslogFactory implements TranslogFactory { + + @Override + public Translog newTranslog( + TranslogConfig translogConfig, + String translogUUID, + TranslogDeletionPolicy translogDeletionPolicy, + LongSupplier globalCheckpointSupplier, + LongSupplier primaryTermSupplier, + LongConsumer persistedSequenceNumberConsumer + ) throws IOException { + + return new Translog( + translogConfig, + translogUUID, + translogDeletionPolicy, + globalCheckpointSupplier, + primaryTermSupplier, + persistedSequenceNumberConsumer + ); + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index ac82cf246cc55..fd52e02132006 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -54,7 +54,8 @@ public InternalTranslogManager( Supplier localCheckpointTrackerSupplier, String translogUUID, TranslogEventListener translogEventListener, - LifecycleAware engineLifeCycleAware + LifecycleAware engineLifeCycleAware, + TranslogFactory translogFactory ) throws IOException { this.shardId = shardId; this.readLock = readLock; @@ -67,7 +68,7 @@ public InternalTranslogManager( if (tracker != null) { tracker.markSeqNoAsPersisted(seqNo); } - }, translogUUID); + }, translogUUID, translogFactory); assert translog.getGeneration() != null; this.translog = translog; assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; @@ -333,10 +334,11 @@ protected Translog openTranslog( TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer, - String translogUUID + String translogUUID, + TranslogFactory translogFactory ) throws IOException { - return new Translog( + return translogFactory.newTranslog( translogConfig, translogUUID, translogDeletionPolicy, diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java new file mode 100644 index 0000000000000..5500bda99808d --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import java.io.IOException; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; + +/** + * Translog Factory to enable creation of various local on-disk + * and remote store flavors of {@link Translog} + * + * @opensearch.internal + */ +@FunctionalInterface +public interface TranslogFactory { + + Translog newTranslog( + final TranslogConfig config, + final String translogUUID, + final TranslogDeletionPolicy deletionPolicy, + final LongSupplier globalCheckpointSupplier, + final LongSupplier primaryTermSupplier, + final LongConsumer persistedSequenceNumberConsumer + ) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java index 09f5f38a9f6a9..96a2dd05851c0 100644 --- a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -35,7 +35,8 @@ public WriteOnlyTranslogManager( Supplier localCheckpointTrackerSupplier, String translogUUID, TranslogEventListener translogEventListener, - LifecycleAware engineLifecycleAware + LifecycleAware engineLifecycleAware, + TranslogFactory translogFactory ) throws IOException { super( translogConfig, @@ -47,7 +48,8 @@ public WriteOnlyTranslogManager( localCheckpointTrackerSupplier, translogUUID, translogEventListener, - engineLifecycleAware + engineLifecycleAware, + translogFactory ); } diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java index 7ddd92ea7b36e..269d89352fb18 100644 --- a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java @@ -16,6 +16,7 @@ import org.opensearch.index.codec.CodecService; import org.opensearch.index.codec.CodecServiceFactory; import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; import org.opensearch.index.translog.TranslogReader; @@ -66,7 +67,8 @@ public void testCreateEngineConfigFromFactory() { () -> new RetentionLeases(0, 0, Collections.emptyList()), null, null, - false + false, + new InternalTranslogFactory() ); assertNotNull(config.getCodec()); @@ -143,7 +145,8 @@ public void testCreateCodecServiceFromFactory() { () -> new RetentionLeases(0, 0, Collections.emptyList()), null, null, - false + false, + new InternalTranslogFactory() ); assertNotNull(config.getCodec()); } diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigTests.java index 1c6d06e9bcc08..1754d6082b86d 100644 --- a/server/src/test/java/org/opensearch/index/engine/EngineConfigTests.java +++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigTests.java @@ -13,6 +13,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; @@ -102,7 +103,8 @@ private EngineConfig createReadOnlyEngine(IndexSettings indexSettings) { () -> RetentionLeases.EMPTY, null, null, - true + true, + new InternalTranslogFactory() ); } } diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java index 5171f0dfa1d18..234abfba66622 100644 --- a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -47,7 +47,8 @@ public void testRecoveryFromTranslog() throws IOException { () -> tracker, translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, - () -> {} + () -> {}, + new InternalTranslogFactory() ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -85,7 +86,8 @@ public void onBeginTranslogRecovery() { beginTranslogRecoveryInvoked.set(true); } }, - () -> {} + () -> {}, + new InternalTranslogFactory() ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -122,7 +124,8 @@ public void testTranslogRollsGeneration() throws IOException { () -> tracker, translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, - () -> {} + () -> {}, + new InternalTranslogFactory() ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -150,7 +153,8 @@ public void testTranslogRollsGeneration() throws IOException { () -> new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED), translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, - () -> {} + () -> {}, + new InternalTranslogFactory() ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -183,7 +187,8 @@ public void testTrimOperationsFromTranslog() throws IOException { () -> tracker, translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, - () -> {} + () -> {}, + new InternalTranslogFactory() ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -213,7 +218,8 @@ public void testTrimOperationsFromTranslog() throws IOException { () -> new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED), translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, - () -> {} + () -> {}, + new InternalTranslogFactory() ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -260,7 +266,8 @@ public void onAfterTranslogSync() { } } }, - () -> {} + () -> {}, + new InternalTranslogFactory() ); translogManagerAtomicReference.set(translogManager); Engine.Index index = indexForDoc(doc); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 7dedc572ff19b..f446538acccbb 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -90,6 +90,7 @@ import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.Translog; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; @@ -555,6 +556,7 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, breakerService, + new InternalTranslogFactory(), checkpointPublisher, remoteStore );