diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java index 7c172e14fda8b..129bc67c8c9e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java @@ -194,10 +194,12 @@ public OperatorSnapshotFutures snapshotState( boolean isUsingCustomRawKeyedState, boolean useAsyncState) throws CheckpointException { - KeyGroupRange keyGroupRange = - null != keyedStateBackend - ? keyedStateBackend.getKeyGroupRange() - : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; + KeyGroupRange keyGroupRange = KeyGroupRange.EMPTY_KEY_GROUP_RANGE; + if (keyedStateBackend != null) { + keyGroupRange = keyedStateBackend.getKeyGroupRange(); + } else if (asyncKeyedStateBackend != null) { + keyGroupRange = asyncKeyedStateBackend.getKeyGroupRange(); + } OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java index 3e4a26fe83fab..8166bff4c0a14 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -44,9 +45,12 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SharedStateRegistryImpl; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.TestTaskStateManager; @@ -66,6 +70,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Random; import java.util.concurrent.RunnableFuture; import static java.util.Arrays.asList; @@ -136,7 +141,7 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception { } protected AsyncKeyedStateBackend createAsyncKeyedBackend( - TypeSerializer keySerializer, int numberOfKeyGroups, Environment env) + TypeSerializer keySerializer, KeyGroupRange keyGroupRange, Environment env) throws Exception { env.setCheckpointStorageAccess(getCheckpointStorageAccess()); @@ -148,8 +153,8 @@ protected AsyncKeyedStateBackend createAsyncKeyedBackend( new JobID(), "test_op", keySerializer, - numberOfKeyGroups, - new KeyGroupRange(0, numberOfKeyGroups - 1), + keyGroupRange.getNumberOfKeyGroups(), + keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, getMetricGroup(), @@ -165,7 +170,7 @@ protected StateBackend.CustomInitializationMetrics getCustomInitializationMetric protected AsyncKeyedStateBackend restoreAsyncKeyedBackend( TypeSerializer keySerializer, - int numberOfKeyGroups, + KeyGroupRange keyGroupRange, List state, Environment env) throws Exception { @@ -177,8 +182,8 @@ protected AsyncKeyedStateBackend restoreAsyncKeyedBackend( new JobID(), "test_op", keySerializer, - numberOfKeyGroups, - new KeyGroupRange(0, numberOfKeyGroups - 1), + keyGroupRange.getNumberOfKeyGroups(), + keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, getMetricGroup(), @@ -212,7 +217,11 @@ void testAsyncKeyedStateBackendSnapshot() throws Exception { KeyedStateHandle stateHandle; try { - backend = createAsyncKeyedBackend(IntSerializer.INSTANCE, jobMaxParallelism, env); + backend = + createAsyncKeyedBackend( + IntSerializer.INSTANCE, + new KeyGroupRange(0, jobMaxParallelism - 1), + env); aec = new AsyncExecutionController<>( new SyncMailboxExecutor(), @@ -299,7 +308,7 @@ void testAsyncKeyedStateBackendSnapshot() throws Exception { backend = restoreAsyncKeyedBackend( IntSerializer.INSTANCE, - jobMaxParallelism, + new KeyGroupRange(0, jobMaxParallelism - 1), Collections.singletonList(stateHandle), env); aec = @@ -345,6 +354,151 @@ void testAsyncKeyedStateBackendSnapshot() throws Exception { assertThat(testExceptionHandler.exception).isNull(); } + @TestTemplate + void testAsyncStateBackendScaleUp() throws Exception { + testKeyGroupSnapshotRestore(2, 5, 10); + } + + @TestTemplate + void testAsyncStateBackendScaleDown() throws Exception { + testKeyGroupSnapshotRestore(4, 3, 10); + } + + private void testKeyGroupSnapshotRestore( + int sourceParallelism, int targetParallelism, int maxParallelism) throws Exception { + + int aecBatchSize = 1; + long aecBufferTimeout = 1; + int aecMaxInFlightRecords = 1000; + Random random = new Random(); + List> stateDescriptors = new ArrayList<>(maxParallelism); + List keyInKeyGroups = new ArrayList<>(maxParallelism); + List expectedValue = new ArrayList<>(maxParallelism); + for (int i = 0; i < maxParallelism; ++i) { + // all states have different name to mock that all the parallelisms of one operator have + // different states. + stateDescriptors.add( + new ValueStateDescriptor<>("state" + i, BasicTypeInfo.INT_TYPE_INFO)); + } + + CheckpointStreamFactory streamFactory = createStreamFactory(); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(); + AsyncExecutionController aec; + + TestAsyncFrameworkExceptionHandler testExceptionHandler = + new TestAsyncFrameworkExceptionHandler(); + + List snapshots = new ArrayList<>(sourceParallelism); + for (int i = 0; i < sourceParallelism; ++i) { + KeyGroupRange range = + KeyGroupRange.of( + maxParallelism * i / sourceParallelism, + maxParallelism * (i + 1) / sourceParallelism - 1); + AsyncKeyedStateBackend backend = + createAsyncKeyedBackend(IntSerializer.INSTANCE, range, env); + aec = + new AsyncExecutionController<>( + new SyncMailboxExecutor(), + testExceptionHandler, + backend.createStateExecutor(), + maxParallelism, + aecBatchSize, + aecBufferTimeout, + aecMaxInFlightRecords, + null); + backend.setup(aec); + + try { + for (int j = range.getStartKeyGroup(); j <= range.getEndKeyGroup(); ++j) { + ValueState state = + backend.createState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + stateDescriptors.get(j)); + int keyInKeyGroup = + getKeyInKeyGroup(random, maxParallelism, KeyGroupRange.of(j, j)); + RecordContext recordContext = aec.buildContext(keyInKeyGroup, keyInKeyGroup); + ; + recordContext.retain(); + aec.setCurrentContext(recordContext); + keyInKeyGroups.add(keyInKeyGroup); + state.update(keyInKeyGroup); + expectedValue.add(keyInKeyGroup); + recordContext.release(); + } + + // snapshot + snapshots.add( + runSnapshot( + backend.snapshot( + 0, + 0, + streamFactory, + CheckpointOptions.forCheckpointWithDefaultLocation()), + sharedStateRegistry)); + } finally { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + + // redistribute the stateHandle + List keyGroupRangesRestore = new ArrayList<>(); + for (int i = 0; i < targetParallelism; ++i) { + keyGroupRangesRestore.add( + KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( + maxParallelism, targetParallelism, i)); + } + List> keyGroupStatesAfterDistribute = + new ArrayList<>(targetParallelism); + for (int i = 0; i < targetParallelism; ++i) { + List keyedStateHandles = new ArrayList<>(); + StateAssignmentOperation.extractIntersectingState( + snapshots, keyGroupRangesRestore.get(i), keyedStateHandles); + keyGroupStatesAfterDistribute.add(keyedStateHandles); + } + // restore and verify + for (int i = 0; i < targetParallelism; ++i) { + AsyncKeyedStateBackend backend = + restoreAsyncKeyedBackend( + IntSerializer.INSTANCE, + keyGroupRangesRestore.get(i), + keyGroupStatesAfterDistribute.get(i), + env); + aec = + new AsyncExecutionController<>( + new SyncMailboxExecutor(), + testExceptionHandler, + backend.createStateExecutor(), + maxParallelism, + aecBatchSize, + aecBufferTimeout, + aecMaxInFlightRecords, + null); + backend.setup(aec); + + try { + KeyGroupRange range = keyGroupRangesRestore.get(i); + for (int j = range.getStartKeyGroup(); j <= range.getEndKeyGroup(); ++j) { + ValueState state = + backend.createState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + stateDescriptors.get(j)); + RecordContext recordContext = + aec.buildContext(keyInKeyGroups.get(j), keyInKeyGroups.get(j)); + recordContext.retain(); + aec.setCurrentContext(recordContext); + assertThat(state.value()).isEqualTo(expectedValue.get(j)); + recordContext.release(); + } + } finally { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + } + @TestTemplate void testKeyGroupedInternalPriorityQueue() throws Exception { testKeyGroupedInternalPriorityQueue(false); @@ -358,7 +512,7 @@ void testKeyGroupedInternalPriorityQueueAddAll() throws Exception { void testKeyGroupedInternalPriorityQueue(boolean addAll) throws Exception { String fieldName = "key-grouped-priority-queue"; AsyncKeyedStateBackend backend = - createAsyncKeyedBackend(IntSerializer.INSTANCE, 128, env); + createAsyncKeyedBackend(IntSerializer.INSTANCE, new KeyGroupRange(0, 127), env); try { KeyGroupedInternalPriorityQueue priorityQueue = backend.create(fieldName, new TestType.V1TestTypeSerializer()); @@ -414,7 +568,7 @@ void testValueStateWorkWithTtl() throws Exception { TestAsyncFrameworkExceptionHandler testExceptionHandler = new TestAsyncFrameworkExceptionHandler(); AsyncKeyedStateBackend backend = - createAsyncKeyedBackend(LongSerializer.INSTANCE, 128, env); + createAsyncKeyedBackend(LongSerializer.INSTANCE, new KeyGroupRange(0, 127), env); AsyncExecutionController aec = new AsyncExecutionController<>( new SyncMailboxExecutor(), @@ -468,6 +622,34 @@ void testValueStateWorkWithTtl() throws Exception { } } + /** Returns an Integer key in specified keyGroupRange. */ + private int getKeyInKeyGroup(Random random, int maxParallelism, KeyGroupRange keyGroupRange) { + int keyInKG = random.nextInt(); + int kg = KeyGroupRangeAssignment.assignToKeyGroup(keyInKG, maxParallelism); + while (!keyGroupRange.contains(kg)) { + keyInKG = random.nextInt(); + kg = KeyGroupRangeAssignment.assignToKeyGroup(keyInKG, maxParallelism); + } + return keyInKG; + } + + private static KeyedStateHandle runSnapshot( + RunnableFuture> snapshotRunnableFuture, + SharedStateRegistry sharedStateRegistry) + throws Exception { + + if (!snapshotRunnableFuture.isDone()) { + snapshotRunnableFuture.run(); + } + + SnapshotResult snapshotResult = snapshotRunnableFuture.get(); + KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot(); + if (jobManagerOwnedSnapshot != null) { + jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry, 0L); + } + return jobManagerOwnedSnapshot; + } + static class TestAsyncFrameworkExceptionHandler implements StateFutureImpl.AsyncFrameworkExceptionHandler { String message = null; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java index 5b095ec03d76c..3b96bf89164cb 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java @@ -327,6 +327,32 @@ public class ForStConfigurableOptions implements Serializable { + " and re-written to the same level as they were before. It makes sure a file goes through" + " compaction filters periodically. 0 means turning off periodic compaction.The default value is '30days'."); + public static final ConfigOption RESTORE_OVERLAP_FRACTION_THRESHOLD = + key("state.backend.forst.restore-overlap-fraction-threshold") + .doubleType() + .defaultValue(0.0) + .withDescription( + "The threshold of overlap fraction between the handle's key-group range and target key-group range. " + + "When restore base DB, only the handle which overlap fraction greater than or equal to threshold " + + "has a chance to be an initial handle. " + + "The default value is 0.0, there is always a handle will be selected for initialization. "); + + public static final ConfigOption USE_INGEST_DB_RESTORE_MODE = + key("state.backend.forst.use-ingest-db-restore-mode") + .booleanType() + .defaultValue(Boolean.FALSE) + .withDescription( + "A recovery mode that directly clips and ingests multiple DBs during state recovery if the keys" + + " in the SST files does not exceed the declared key-group range."); + + public static final ConfigOption USE_DELETE_FILES_IN_RANGE_DURING_RESCALING = + key("state.backend.forst.rescaling.use-delete-files-in-range") + .booleanType() + .defaultValue(Boolean.FALSE) + .withDescription( + "If true, during rescaling, the deleteFilesInRange API will be invoked " + + "to clean up the useless files so that local disk space can be reclaimed more promptly."); + static final ConfigOption[] CANDIDATE_CONFIGS = new ConfigOption[] { // cache @@ -361,6 +387,9 @@ public class ForStConfigurableOptions implements Serializable { USE_BLOOM_FILTER, BLOOM_FILTER_BITS_PER_KEY, BLOOM_FILTER_BLOCK_BASED_MODE, + RESTORE_OVERLAP_FRACTION_THRESHOLD, + USE_INGEST_DB_RESTORE_MODE, + USE_DELETE_FILES_IN_RANGE_DURING_RESCALING }; private static final Set> POSITIVE_INT_CONFIG_SET = diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStIncrementalCheckpointUtils.java new file mode 100644 index 0000000000000..03e693efd28fb --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStIncrementalCheckpointUtils.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava32.com.google.common.primitives.UnsignedBytes; + +import org.forstdb.Checkpoint; +import org.forstdb.ColumnFamilyHandle; +import org.forstdb.ExportImportFilesMetaData; +import org.forstdb.LiveFileMetaData; +import org.forstdb.RocksDB; +import org.forstdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.File; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +/** Utils for RocksDB Incremental Checkpoint. */ +public class ForStIncrementalCheckpointUtils { + + private static final Logger logger = + LoggerFactory.getLogger(ForStIncrementalCheckpointUtils.class); + + /** + * Evaluates state handle's "score" regarding the target range when choosing the best state + * handle to init the initial db for recovery, if the overlap fraction is less than + * overlapFractionThreshold, then just return {@code Score.MIN} to mean the handle has no chance + * to be the initial handle. + */ + private static Score stateHandleEvaluator( + KeyedStateHandle stateHandle, + KeyGroupRange targetKeyGroupRange, + double overlapFractionThreshold) { + final KeyGroupRange handleKeyGroupRange = stateHandle.getKeyGroupRange(); + final KeyGroupRange intersectGroup = + handleKeyGroupRange.getIntersection(targetKeyGroupRange); + + final double overlapFraction = + (double) intersectGroup.getNumberOfKeyGroups() + / handleKeyGroupRange.getNumberOfKeyGroups(); + + if (overlapFraction < overlapFractionThreshold) { + return Score.MIN; + } + return new Score(intersectGroup.getNumberOfKeyGroups(), overlapFraction); + } + + /** + * Score of the state handle, intersect group range is compared first, and then compare the + * overlap fraction. + */ + private static class Score implements Comparable { + + public static final Score MIN = new Score(Integer.MIN_VALUE, -1.0); + + private final int intersectGroupRange; + + private final double overlapFraction; + + public Score(int intersectGroupRange, double overlapFraction) { + this.intersectGroupRange = intersectGroupRange; + this.overlapFraction = overlapFraction; + } + + public int getIntersectGroupRange() { + return intersectGroupRange; + } + + public double getOverlapFraction() { + return overlapFraction; + } + + @Override + public int compareTo(@Nullable Score other) { + return Comparator.nullsFirst( + Comparator.comparing(Score::getIntersectGroupRange) + .thenComparing(Score::getIntersectGroupRange) + .thenComparing(Score::getOverlapFraction)) + .compare(this, other); + } + } + + /** + * The method to clip the db instance according to the target key group range using the {@link + * RocksDB#delete(ColumnFamilyHandle, byte[])}. + * + * @param db the RocksDB instance to be clipped. + * @param columnFamilyHandles the column families in the db instance. + * @param targetKeyGroupRange the target key group range. + * @param currentKeyGroupRange the key group range of the db instance. + * @param keyGroupPrefixBytes Number of bytes required to prefix the key groups. + * @param useDeleteFilesInRange whether to call db.deleteFilesInRanges for the deleted ranges. + */ + public static void clipDBWithKeyGroupRange( + @Nonnull RocksDB db, + @Nonnull List columnFamilyHandles, + @Nonnull KeyGroupRange targetKeyGroupRange, + @Nonnull KeyGroupRange currentKeyGroupRange, + @Nonnegative int keyGroupPrefixBytes, + boolean useDeleteFilesInRange) + throws RocksDBException { + + List deleteFilesRanges = new ArrayList<>(4); + + if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) { + prepareRangeDeletes( + keyGroupPrefixBytes, + currentKeyGroupRange.getStartKeyGroup(), + targetKeyGroupRange.getStartKeyGroup(), + deleteFilesRanges); + } + + if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) { + prepareRangeDeletes( + keyGroupPrefixBytes, + targetKeyGroupRange.getEndKeyGroup() + 1, + currentKeyGroupRange.getEndKeyGroup() + 1, + deleteFilesRanges); + } + + logger.info( + "Performing range delete for backend with target key-groups range {} with boundaries set {} - deleteFilesInRanges = {}.", + targetKeyGroupRange.prettyPrintInterval(), + deleteFilesRanges.stream().map(Arrays::toString).collect(Collectors.toList()), + useDeleteFilesInRange); + + deleteRangeData(db, columnFamilyHandles, deleteFilesRanges, useDeleteFilesInRange); + } + + private static void prepareRangeDeletes( + int keyGroupPrefixBytes, + int beginKeyGroup, + int endKeyGroup, + List deleteFilesRangesOut) { + byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes]; + byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup(beginKeyGroup, beginKeyGroupBytes); + CompositeKeySerializationUtils.serializeKeyGroup(endKeyGroup, endKeyGroupBytes); + deleteFilesRangesOut.add(beginKeyGroupBytes); + deleteFilesRangesOut.add(endKeyGroupBytes); + } + + /** + * Delete the record that falls into the given deleteRanges of the db. + * + * @param db the target need to be clipped. + * @param columnFamilyHandles the column family need to be clipped. + * @param deleteRanges - pairs of deleted ranges (from1, to1, from2, to2, ...). For each pair + * [from, to), the startKey ('from') is inclusive, the endKey ('to') is exclusive. + * @param useDeleteFilesInRange whether to use deleteFilesInRange to clean up redundant files. + */ + private static void deleteRangeData( + RocksDB db, + List columnFamilyHandles, + List deleteRanges, + boolean useDeleteFilesInRange) + throws RocksDBException { + + if (deleteRanges.isEmpty()) { + // nothing to do. + return; + } + + Preconditions.checkArgument(deleteRanges.size() % 2 == 0); + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + // First delete the files in ranges + if (useDeleteFilesInRange) { + db.deleteFilesInRanges(columnFamilyHandle, deleteRanges, false); + } + + // Then put range limiting tombstones in place. + for (int i = 0; i < deleteRanges.size() / 2; i++) { + // Using RocksDB's deleteRange will take advantage of delete + // tombstones, which mark the range as deleted. + // + // https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/include/rocksdb/db.h#L363-L377 + db.deleteRange( + columnFamilyHandle, deleteRanges.get(i * 2), deleteRanges.get(i * 2 + 1)); + } + } + } + + /** + * Checks data in the SST files of the given DB for keys that exceed either the lower and upper + * bound of the proclaimed key-groups range of the DB. + * + * @param db the DB to check. + * @param keyGroupPrefixBytes the number of bytes used to serialize the key-group prefix of keys + * in the DB. + * @param dbExpectedKeyGroupRange the expected key-groups range of the DB. + * @return the check result with detailed info about lower and upper bound violations. + */ + public static RangeCheckResult checkSstDataAgainstKeyGroupRange( + RocksDB db, int keyGroupPrefixBytes, KeyGroupRange dbExpectedKeyGroupRange) { + final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes]; + final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes]; + + CompositeKeySerializationUtils.serializeKeyGroup( + dbExpectedKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes); + + CompositeKeySerializationUtils.serializeKeyGroup( + dbExpectedKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes); + + KeyRange dbKeyRange = getDBKeyRange(db); + return RangeCheckResult.of( + beginKeyGroupBytes, + endKeyGroupBytes, + dbKeyRange.minKey, + dbKeyRange.maxKey, + keyGroupPrefixBytes); + } + + /** Returns a pair of minimum and maximum key in the sst files of the given database. */ + private static KeyRange getDBKeyRange(RocksDB db) { + final Comparator comparator = UnsignedBytes.lexicographicalComparator(); + final List liveFilesMetaData = db.getLiveFilesMetaData(); + + if (liveFilesMetaData.isEmpty()) { + return KeyRange.EMPTY; + } + + Iterator liveFileMetaDataIterator = liveFilesMetaData.iterator(); + LiveFileMetaData fileMetaData = liveFileMetaDataIterator.next(); + byte[] smallestKey = fileMetaData.smallestKey(); + byte[] largestKey = fileMetaData.largestKey(); + while (liveFileMetaDataIterator.hasNext()) { + fileMetaData = liveFileMetaDataIterator.next(); + byte[] sstSmallestKey = fileMetaData.smallestKey(); + byte[] sstLargestKey = fileMetaData.largestKey(); + if (comparator.compare(sstSmallestKey, smallestKey) < 0) { + smallestKey = sstSmallestKey; + } + if (comparator.compare(sstLargestKey, largestKey) > 0) { + largestKey = sstLargestKey; + } + } + return KeyRange.of(smallestKey, largestKey); + } + + /** + * Exports the data of the given column families in the given DB. + * + * @param db the DB to export from. + * @param columnFamilyHandles the column families to export. + * @param registeredStateMetaInfoBases meta information about the registered states in the DB. + * @param exportBasePath the path to which the export files go. + * @param resultOutput output parameter for the metadata of the export. + * @throws RocksDBException on problems inside RocksDB. + */ + public static void exportColumnFamilies( + RocksDB db, + List columnFamilyHandles, + List registeredStateMetaInfoBases, + Path exportBasePath, + Map> resultOutput) + throws RocksDBException { + + Preconditions.checkArgument( + columnFamilyHandles.size() == registeredStateMetaInfoBases.size(), + "Lists are aligned by index and must be of the same size!"); + + try (final Checkpoint checkpoint = Checkpoint.create(db)) { + for (int i = 0; i < columnFamilyHandles.size(); i++) { + RegisteredStateMetaInfoBase.Key stateMetaInfoAsKey = + registeredStateMetaInfoBases.get(i).asMapKey(); + + Path subPath = exportBasePath.resolve(UUID.randomUUID().toString()); + ExportImportFilesMetaData exportedColumnFamilyMetaData = + checkpoint.exportColumnFamily( + columnFamilyHandles.get(i), subPath.toString()); + + File[] exportedSstFiles = + subPath.toFile() + .listFiles((file, name) -> name.toLowerCase().endsWith(".sst")); + + if (exportedSstFiles != null && exportedSstFiles.length > 0) { + resultOutput + .computeIfAbsent(stateMetaInfoAsKey, (key) -> new ArrayList<>()) + .add(exportedColumnFamilyMetaData); + } else { + // Close unused empty export result + IOUtils.closeQuietly(exportedColumnFamilyMetaData); + } + } + } + } + + /** check whether the bytes is before prefixBytes in the character order. */ + public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull byte[] prefixBytes) { + final int prefixLength = prefixBytes.length; + for (int i = 0; i < prefixLength; ++i) { + int r = (char) prefixBytes[i] - (char) bytes[i]; + if (r != 0) { + return r > 0; + } + } + return false; + } + + /** + * Choose the best state handle according to the {@link #stateHandleEvaluator(KeyedStateHandle, + * KeyGroupRange, double)} to init the initial db from the given lists and returns its index. + * + * @param restoreStateHandles The candidate state handles. + * @param targetKeyGroupRange The target key group range. + * @param overlapFractionThreshold configured threshold for overlap. + * @return the index of the best candidate handle in the list or -1 if the list was empty. + * @param the generic parameter type of the state handles. + */ + public static int findTheBestStateHandleForInitial( + @Nonnull List restoreStateHandles, + @Nonnull KeyGroupRange targetKeyGroupRange, + double overlapFractionThreshold) { + + if (restoreStateHandles.isEmpty()) { + return -1; + } + + // Shortcut for a common case (scale out) + if (restoreStateHandles.size() == 1) { + return 0; + } + + int currentPos = 0; + int bestHandlePos = 0; + Score bestScore = Score.MIN; + for (T rawStateHandle : restoreStateHandles) { + Score handleScore = + stateHandleEvaluator( + rawStateHandle, targetKeyGroupRange, overlapFractionThreshold); + if (handleScore.compareTo(bestScore) > 0) { + bestHandlePos = currentPos; + bestScore = handleScore; + } + ++currentPos; + } + return bestHandlePos; + } + + /** Helper class tha defines a key-range in RocksDB as byte arrays for min and max key. */ + private static final class KeyRange { + static final KeyRange EMPTY = KeyRange.of(new byte[0], new byte[0]); + + final byte[] minKey; + final byte[] maxKey; + + private KeyRange(byte[] minKey, byte[] maxKey) { + this.minKey = minKey; + this.maxKey = maxKey; + } + + static KeyRange of(byte[] minKey, byte[] maxKey) { + return new KeyRange(minKey, maxKey); + } + } + + /** + * Helper class that represents the result of a range check of the actual keys in a RocksDB + * instance against the proclaimed key-group range of the instance. In short, this checks if the + * instance contains any keys (or tombstones for keys) that don't belong in the instance's + * key-groups range. + */ + public static final class RangeCheckResult { + private final byte[] proclaimedMinKey; + private final byte[] proclaimedMaxKey; + private final byte[] actualMinKey; + private final byte[] actualMaxKey; + final boolean leftInRange; + final boolean rightInRange; + + final int keyGroupPrefixBytes; + + private RangeCheckResult( + byte[] proclaimedMinKey, + byte[] proclaimedMaxKey, + byte[] actualMinKey, + byte[] actualMaxKey, + int keyGroupPrefixBytes) { + Comparator comparator = UnsignedBytes.lexicographicalComparator(); + this.proclaimedMinKey = proclaimedMinKey; + this.proclaimedMaxKey = proclaimedMaxKey; + this.actualMinKey = actualMinKey; + this.actualMaxKey = actualMaxKey; + this.leftInRange = comparator.compare(actualMinKey, proclaimedMinKey) >= 0; + // TODO: consider using <= here to avoid that range delete tombstones of + // (targetMaxKeyGroup + 1) prevent using ingest for no good reason. + this.rightInRange = comparator.compare(actualMaxKey, proclaimedMaxKey) < 0; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; + } + + public boolean allInRange() { + return leftInRange && rightInRange; + } + + public byte[] getProclaimedMinKey() { + return proclaimedMinKey; + } + + public byte[] getProclaimedMaxKey() { + return proclaimedMaxKey; + } + + public byte[] getActualMinKey() { + return actualMinKey; + } + + public byte[] getActualMaxKey() { + return actualMaxKey; + } + + public int getKeyGroupPrefixBytes() { + return keyGroupPrefixBytes; + } + + public boolean isLeftInRange() { + return leftInRange; + } + + public boolean isRightInRange() { + return rightInRange; + } + + static RangeCheckResult of( + byte[] proclaimedMinKey, + byte[] proclaimedMaxKey, + byte[] actualMinKey, + byte[] actualMaxKey, + int keyGroupPrefixBytes) { + return new RangeCheckResult( + proclaimedMinKey, + proclaimedMaxKey, + actualMinKey, + actualMaxKey, + keyGroupPrefixBytes); + } + + @Override + public String toString() { + return "RangeCheckResult{" + + "leftInRange=" + + leftInRange + + ", rightInRange=" + + rightInRange + + ", actualMinKeyGroup=" + + CompositeKeySerializationUtils.extractKeyGroup( + keyGroupPrefixBytes, getActualMinKey()) + + ", actualMaxKeyGroup=" + + CompositeKeySerializationUtils.extractKeyGroup( + keyGroupPrefixBytes, getActualMaxKey()) + + '}'; + } + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index 6e0c7613ab18b..bca44dc5a2bb7 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -366,7 +366,7 @@ public S createStateInternal( columnFamilyOptionsFactory, ttlCompactFiltersManager, optionsContainer.getWriteBufferManagerCapacity(), - null); + cancelStreamRegistry); ForStOperationUtils.registerKvStateInformation( this.kvStateInformation, this.nativeMetricMonitor, diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index 6abc97dcc9064..54e92ab6984a1 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -74,6 +74,8 @@ import java.util.function.Function; import java.util.function.Supplier; +import static org.apache.flink.state.forst.ForStConfigurableOptions.WRITE_BATCH_SIZE; + /** * Builder class for {@link ForStKeyedStateBackend} which handles all necessary initializations and * cleanups. @@ -89,6 +91,8 @@ public class ForStKeyedStateBackendBuilder private static final int VALUE_SERIALIZER_BUFFER_START_SIZE = 128; + private long writeBatchSize = WRITE_BATCH_SIZE.defaultValue().getBytes(); + /** String that identifies the operator that owns this backend. */ private final String operatorIdentifier; @@ -122,6 +126,11 @@ public class ForStKeyedStateBackendBuilder /** ForSt property-based and statistics-based native metrics options. */ private ForStNativeMetricOptions nativeMetricOptions; + private boolean rescalingUseDeleteFilesInRange = false; + + private double overlapFractionThreshold = 0.5; + private boolean useIngestDbRestoreMode = false; + public ForStKeyedStateBackendBuilder( String operatorIdentifier, ClassLoader userCodeClassLoader, @@ -165,6 +174,22 @@ ForStKeyedStateBackendBuilder setNativeMetricOptions( return this; } + ForStKeyedStateBackendBuilder setOverlapFractionThreshold(double overlapFractionThreshold) { + this.overlapFractionThreshold = overlapFractionThreshold; + return this; + } + + ForStKeyedStateBackendBuilder setUseIngestDbRestoreMode(boolean useIngestDbRestoreMode) { + this.useIngestDbRestoreMode = useIngestDbRestoreMode; + return this; + } + + ForStKeyedStateBackendBuilder setRescalingUseDeleteFilesInRange( + boolean rescalingUseDeleteFilesInRange) { + this.rescalingUseDeleteFilesInRange = rescalingUseDeleteFilesInRange; + return this; + } + @Override public ForStKeyedStateBackend build() throws BackendBuildingException { ColumnFamilyHandle defaultColumnFamilyHandle = null; @@ -212,7 +237,12 @@ public ForStKeyedStateBackend build() throws BackendBuildingException { try { optionsContainer.prepareDirectories(); - restoreOperation = getForStRestoreOperation(kvStateInformation, registeredPQStates); + restoreOperation = + getForStRestoreOperation( + keyGroupPrefixBytes, + kvStateInformation, + registeredPQStates, + ttlCompactFiltersManager); ForStRestoreResult restoreResult = restoreOperation.restore(); db = restoreResult.getDb(); defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle(); @@ -294,8 +324,10 @@ public ForStKeyedStateBackend build() throws BackendBuildingException { } private ForStRestoreOperation getForStRestoreOperation( + int keyGroupPrefixBytes, LinkedHashMap kvStateInformation, - LinkedHashMap> registeredPQStates) { + LinkedHashMap> registeredPQStates, + ForStDBTtlCompactFiltersManager ttlCompactFiltersManager) { // Currently, ForStDB does not support mixing local-dir and remote-dir, and ForStDB will // concatenates the dfs directory with the local directory as working dir when using flink // env. We expect to directly use the dfs directory in flink env or local directory as @@ -305,7 +337,7 @@ private ForStRestoreOperation getForStRestoreOperation( Path instanceForStPath = optionsContainer.getRemoteForStPath() == null ? optionsContainer.getLocalForStPath() - : new Path("/"); + : new Path("/db"); if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) { return new ForStNoneRestoreOperation( @@ -314,13 +346,17 @@ private ForStRestoreOperation getForStRestoreOperation( optionsContainer.getDbOptions(), columnFamilyOptionsFactory, nativeMetricOptions, - metricGroup); + metricGroup, + ttlCompactFiltersManager, + writeBatchSize, + optionsContainer.getWriteBufferManagerCapacity()); } KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next(); if (firstStateHandle instanceof IncrementalRemoteKeyedStateHandle) { return new ForStIncrementalRestoreOperation<>( operatorIdentifier, keyGroupRange, + keyGroupPrefixBytes, cancelStreamRegistry, userCodeClassLoader, kvStateInformation, @@ -332,9 +368,15 @@ private ForStRestoreOperation getForStRestoreOperation( columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, + ttlCompactFiltersManager, + writeBatchSize, + optionsContainer.getWriteBufferManagerCapacity(), customInitializationMetrics, CollectionUtil.checkedSubTypeCast( - restoreStateHandles, IncrementalRemoteKeyedStateHandle.class)); + restoreStateHandles, IncrementalRemoteKeyedStateHandle.class), + overlapFractionThreshold, + useIngestDbRestoreMode, + rescalingUseDeleteFilesInRange); } else if (priorityQueueConfig.getPriorityQueueStateType() == ForStStateBackend.PriorityQueueStateType.HEAP) { // Note: This branch can be touched after ForSt Support canonical savepoint, @@ -352,6 +394,9 @@ private ForStRestoreOperation getForStRestoreOperation( columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, + ttlCompactFiltersManager, + writeBatchSize, + optionsContainer.getWriteBufferManagerCapacity(), restoreStateHandles, cancelStreamRegistry); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java index 890d65ce7e2b2..492aebdf2fff8 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java @@ -19,6 +19,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.fs.ICloseableRegistry; +import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.memory.OpaqueMemoryResource; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; @@ -34,6 +35,7 @@ import org.forstdb.ColumnFamilyOptions; import org.forstdb.DBOptions; import org.forstdb.ExportImportFilesMetaData; +import org.forstdb.ImportColumnFamilyOptions; import org.forstdb.ReadOptions; import org.forstdb.RocksDB; import org.forstdb.RocksDBException; @@ -102,44 +104,29 @@ public static RocksDB openDB( return dbRef; } - /** Creates a column family handle from a state id. */ - public static ColumnFamilyHandle createColumnFamilyHandle( - String stateId, + private static ColumnFamilyHandle createColumnFamily( + ColumnFamilyDescriptor columnDescriptor, RocksDB db, - Function columnFamilyOptionsFactory) { - - ColumnFamilyDescriptor columnFamilyDescriptor = - createColumnFamilyDescriptor(stateId, columnFamilyOptionsFactory); - - final ColumnFamilyHandle columnFamilyHandle; - try { - columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db); - } catch (Exception ex) { - IOUtils.closeQuietly(columnFamilyDescriptor.getOptions()); - throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex); + List importFilesMetaData, + ICloseableRegistry cancelStreamRegistryForRestore) + throws RocksDBException, InterruptedException { + if (Thread.currentThread().isInterrupted()) { + // abort recovery if the task thread was already interrupted + // e.g. because the task was cancelled + throw new InterruptedException("The thread was interrupted, aborting recovery"); + } else if (cancelStreamRegistryForRestore.isClosed()) { + throw new CancelTaskException("The stream was closed, aborting recovery"); } - return columnFamilyHandle; - } - - /** Creates a column descriptor for a state column family. */ - public static ColumnFamilyDescriptor createColumnFamilyDescriptor( - String stateId, Function columnFamilyOptionsFactory) { - - byte[] nameBytes = stateId.getBytes(ConfigConstants.DEFAULT_CHARSET); - Preconditions.checkState( - !Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), - "The chosen state name 'default' collides with the name of the default column family!"); - - ColumnFamilyOptions options = - createColumnFamilyOptions(columnFamilyOptionsFactory, stateId); - - return new ColumnFamilyDescriptor(nameBytes, options); - } - - private static ColumnFamilyHandle createColumnFamily( - ColumnFamilyDescriptor columnDescriptor, RocksDB db) throws RocksDBException { - return db.createColumnFamily(columnDescriptor); + if (importFilesMetaData.isEmpty()) { + return db.createColumnFamily(columnDescriptor); + } else { + try (ImportColumnFamilyOptions importColumnFamilyOptions = + new ImportColumnFamilyOptions().setMoveFiles(true)) { + return db.createColumnFamilyWithImport( + columnDescriptor, importColumnFamilyOptions, importFilesMetaData); + } + } } public static ColumnFamilyOptions createColumnFamilyOptions( @@ -224,7 +211,12 @@ public static ForStKvStateInfo createStateInfo( writeBufferManagerCapacity); try { - ColumnFamilyHandle columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db); + ColumnFamilyHandle columnFamilyHandle = + createColumnFamily( + columnFamilyDescriptor, + db, + importFilesMetaData, + cancelStreamRegistryForRestore); return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase); } catch (Exception ex) { IOUtils.closeQuietly(columnFamilyDescriptor.getOptions()); @@ -350,24 +342,6 @@ public static void registerKvStateInformation( } } - public static ForStKvStateInfo createStateInfo( - RegisteredStateMetaInfoBase metaInfoBase, - RocksDB db, - Function columnFamilyOptionsFactory) { - ColumnFamilyDescriptor columnFamilyDescriptor = - createColumnFamilyDescriptor(metaInfoBase.getName(), columnFamilyOptionsFactory); - - final ColumnFamilyHandle columnFamilyHandle; - try { - columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db); - } catch (Exception ex) { - IOUtils.closeQuietly(columnFamilyDescriptor.getOptions()); - throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex); - } - - return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase); - } - private static void throwExceptionIfPathLengthExceededOnWindows(String path, Exception cause) throws IOException { // max directory path length on Windows is 247. diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index b807e4d6c4b9b..89dcb93cfa489 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -74,7 +74,10 @@ public final class ForStResourceContainer implements AutoCloseable { private static final String FORST_RELOCATE_LOG_SUFFIX = "_LOG"; // the filename length limit is 255 on most operating systems - private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - FORST_RELOCATE_LOG_SUFFIX.length(); + // In rocksdb, if db_log_dir is non empty, the log files will be in the specified dir, + // and the db data dir's absolute path will be used as the log file name's prefix. + private static final int INSTANCE_PATH_LENGTH_LIMIT = + 255 / 2 - FORST_RELOCATE_LOG_SUFFIX.length(); @Nullable private final Path remoteBasePath; @@ -187,7 +190,7 @@ public DBOptions getDbOptions() { // configured, // fallback to local directory currently temporarily. if (remoteForStPath != null) { - opt.setEnv(new FlinkEnv(remoteForStPath.toString(), forstFileSystem)); + opt.setEnv(new FlinkEnv(remoteBasePath.toString(), forstFileSystem)); } return opt; @@ -467,6 +470,10 @@ private DBOptions setDBOptionsFromConfigurableOptions(DBOptions currentOptions) if (localForStPath == null || localForStPath.getPath().length() <= INSTANCE_PATH_LENGTH_LIMIT) { relocateDefaultDbLogDir(currentOptions); + } else if (remoteForStPath != null) { // log must put in local + Path relocatedPath = localForStPath.getParent().getParent(); + LOG.warn("ForSt remote path is not null, relocate log in {}.", relocatedPath); + currentOptions.setDbLogDir(relocatedPath.toString()); } else { // disable log relocate when instance path length exceeds limit to prevent ForSt // log file creation failure, details in FLINK-31743 diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java index 3de5af1efd2a0..d7c5067addaef 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java @@ -46,6 +46,7 @@ import org.apache.flink.util.DynamicCodeLoadingException; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.TernaryBoolean; import org.apache.flink.util.concurrent.FutureUtils; import org.forstdb.NativeLibraryLoader; @@ -72,6 +73,10 @@ import java.util.function.Supplier; import static org.apache.flink.configuration.description.TextElement.text; +import static org.apache.flink.state.forst.ForStConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD; +import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING; +import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; +import static org.apache.flink.util.Preconditions.checkArgument; /** * A {@link org.apache.flink.runtime.state.StateBackend} that stores its state in a ForSt instance. @@ -89,6 +94,8 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend private static final Logger LOG = LoggerFactory.getLogger(ForStStateBackend.class); + private static final double UNDEFINED_OVERLAP_FRACTION_THRESHOLD = -1; + /** The number of (re)tries for loading the ForSt JNI library. */ private static final int FORST_LIB_LOADING_ATTEMPTS = 3; @@ -146,6 +153,25 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend * The configuration for rocksdb priorityQueue state settings (priorityQueue state type, etc.). */ private final ForStPriorityQueueConfig priorityQueueConfig; + + /** + * The threshold of the overlap fraction between the handle's key-group range and target + * key-group range. + */ + private final double overlapFractionThreshold; + + /** + * Whether we use the optimized Ingest/Clip DB method for rescaling RocksDB incremental + * checkpoints. + */ + private final TernaryBoolean useIngestDbRestoreMode; + + /** + * Whether to leverage deleteFilesInRange API to clean up useless rocksdb files during + * rescaling. + */ + private final TernaryBoolean rescalingUseDeleteFilesInRange; + // ------------------------------------------------------------------------ /** Creates a new {@code ForStStateBackend} for storing state. */ @@ -154,6 +180,9 @@ public ForStStateBackend() { this.memoryConfiguration = new ForStMemoryConfiguration(); this.priorityQueueConfig = new ForStPriorityQueueConfig(); this.forStMemoryFactory = ForStMemoryFactory.DEFAULT; + this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD; + this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED; + this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED; } /** @@ -223,6 +252,26 @@ private ForStStateBackend( latencyTrackingConfigBuilder = original.latencyTrackingConfigBuilder.configure(config); this.forStMemoryFactory = original.forStMemoryFactory; + + // configure overlap fraction threshold + overlapFractionThreshold = + original.overlapFractionThreshold == UNDEFINED_OVERLAP_FRACTION_THRESHOLD + ? config.get(RESTORE_OVERLAP_FRACTION_THRESHOLD) + : original.overlapFractionThreshold; + + checkArgument( + overlapFractionThreshold >= 0 && this.overlapFractionThreshold <= 1, + "Overlap fraction threshold of restoring should be between 0 and 1"); + + useIngestDbRestoreMode = + TernaryBoolean.mergeTernaryBooleanWithConfig( + original.useIngestDbRestoreMode, USE_INGEST_DB_RESTORE_MODE, config); + + rescalingUseDeleteFilesInRange = + TernaryBoolean.mergeTernaryBooleanWithConfig( + original.rescalingUseDeleteFilesInRange, + USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, + config); } // ------------------------------------------------------------------------ @@ -373,7 +422,18 @@ public ForStKeyedStateBackend createAsyncKeyedStateBackend( // TODO: remove after support more snapshot strategy .setEnableIncrementalCheckpointing(true) .setNativeMetricOptions( - resourceContainer.getMemoryWatcherOptions(nativeMetricOptions)); + resourceContainer.getMemoryWatcherOptions(nativeMetricOptions)) + .setOverlapFractionThreshold( + overlapFractionThreshold == UNDEFINED_OVERLAP_FRACTION_THRESHOLD + ? RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue() + : overlapFractionThreshold) + .setUseIngestDbRestoreMode( + useIngestDbRestoreMode.getOrDefault( + USE_INGEST_DB_RESTORE_MODE.defaultValue())) + .setRescalingUseDeleteFilesInRange( + rescalingUseDeleteFilesInRange.getOrDefault( + USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue())); + return builder.build(); } @@ -453,7 +513,17 @@ public AbstractKeyedStateBackend createKeyedStateBackend( keyGroupCompressionDecorator, parameters.getCancelStreamRegistry()) .setNativeMetricOptions( - resourceContainer.getMemoryWatcherOptions(nativeMetricOptions)); + resourceContainer.getMemoryWatcherOptions(nativeMetricOptions)) + .setOverlapFractionThreshold( + overlapFractionThreshold == UNDEFINED_OVERLAP_FRACTION_THRESHOLD + ? RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue() + : overlapFractionThreshold) + .setUseIngestDbRestoreMode( + useIngestDbRestoreMode.getOrDefault( + USE_INGEST_DB_RESTORE_MODE.defaultValue())) + .setRescalingUseDeleteFilesInRange( + rescalingUseDeleteFilesInRange.getOrDefault( + USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue())); return builder.build(); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java index 36131ff4537c1..a0877f5bb80fa 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java @@ -306,7 +306,7 @@ public void transferAllStateDataToDirectory( FileSystem fs = forStFs != null ? forStFs : dir.getFileSystem(); fs.delete(dir, true); } catch (IOException ignored) { - + LOG.warn("Failed to delete transfer destination.", ignored); } }); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java index 1cdafdcf4b1e0..a33879a6cc388 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java @@ -203,8 +203,9 @@ public boolean rename(Path src, Path dst) throws IOException { // renaming if the target already exists. So, we delete the target before attempting the // rename. - if (localFileFilter.apply(src.getName())) { - Path localSrc = tryBuildLocalPath(src).f1; + Tuple2 localPathTuple = tryBuildLocalPath(src); + if (localPathTuple.f0) { + Path localSrc = localPathTuple.f1; Path localDst = tryBuildLocalPath(dst).f1; FileStatus fileStatus = localFS.getFileStatus(localSrc); boolean success = localFS.rename(localSrc, localDst); @@ -348,10 +349,10 @@ public boolean delete(Path path, boolean recursive) throws IOException { @Override public boolean mkdirs(Path path) throws IOException { - boolean success = false; + boolean success = true; Tuple2 localPathTuple = tryBuildLocalPath(path); if (localPathTuple.f0) { - success = localFS.mkdirs(localPathTuple.f1); + success &= localFS.mkdirs(localPathTuple.f1); } success &= delegateFS.mkdirs(path); return success; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHandle.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHandle.java index f104822ecbb5e..dc2a3cd729422 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHandle.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHandle.java @@ -18,19 +18,23 @@ package org.apache.flink.state.forst.restore; +import org.apache.flink.core.fs.ICloseableRegistry; import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.state.forst.ForStDBTtlCompactFiltersManager; import org.apache.flink.state.forst.ForStNativeMetricMonitor; import org.apache.flink.state.forst.ForStNativeMetricOptions; import org.apache.flink.state.forst.ForStOperationUtils; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; import org.forstdb.ColumnFamilyDescriptor; import org.forstdb.ColumnFamilyHandle; import org.forstdb.ColumnFamilyOptions; import org.forstdb.DBOptions; +import org.forstdb.ExportImportFilesMetaData; import org.forstdb.RocksDB; import javax.annotation.Nonnull; @@ -43,6 +47,9 @@ import java.util.Map; import java.util.function.Function; +import static org.apache.flink.state.forst.ForStOperationUtils.createStateInfo; +import static org.apache.flink.state.forst.ForStOperationUtils.registerKvStateInformation; + /** Utility for creating a ForSt instance either from scratch or from restored local state. */ class ForStHandle implements AutoCloseable { @@ -54,6 +61,8 @@ class ForStHandle implements AutoCloseable { private List columnFamilyDescriptors; private final ForStNativeMetricOptions nativeMetricOptions; private final MetricGroup metricGroup; + private final ForStDBTtlCompactFiltersManager ttlCompactFiltersManager; + private final Long writeBufferManagerCapacity; private RocksDB db; private ColumnFamilyHandle defaultColumnFamilyHandle; @@ -65,7 +74,9 @@ protected ForStHandle( DBOptions dbOptions, Function columnFamilyOptionsFactory, ForStNativeMetricOptions nativeMetricOptions, - MetricGroup metricGroup) { + MetricGroup metricGroup, + ForStDBTtlCompactFiltersManager ttlCompactFiltersManager, + Long writeBufferManagerCapacity) { this.kvStateInformation = kvStateInformation; this.dbPath = instanceRocksDBPath.getPath(); this.dbOptions = dbOptions; @@ -74,6 +85,8 @@ protected ForStHandle( this.metricGroup = metricGroup; this.columnFamilyHandles = new ArrayList<>(1); this.columnFamilyDescriptors = Collections.emptyList(); + this.ttlCompactFiltersManager = ttlCompactFiltersManager; + this.writeBufferManagerCapacity = writeBufferManagerCapacity; } void openDB() throws IOException { @@ -82,7 +95,8 @@ void openDB() throws IOException { void openDB( @Nonnull List columnFamilyDescriptors, - @Nonnull List stateMetaInfoSnapshots) + @Nonnull List stateMetaInfoSnapshots, + @Nonnull ICloseableRegistry cancelStreamRegistryForRestore) throws IOException { this.columnFamilyDescriptors = columnFamilyDescriptors; this.columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1); @@ -90,7 +104,9 @@ void openDB( // Register CF handlers for (int i = 0; i < stateMetaInfoSnapshots.size(); i++) { getOrRegisterStateColumnFamilyHandle( - columnFamilyHandles.get(i), stateMetaInfoSnapshots.get(i)); + columnFamilyHandles.get(i), + stateMetaInfoSnapshots.get(i), + cancelStreamRegistryForRestore); } } @@ -114,7 +130,9 @@ private void loadDb() throws IOException { } ForStOperationUtils.ForStKvStateInfo getOrRegisterStateColumnFamilyHandle( - ColumnFamilyHandle columnFamilyHandle, StateMetaInfoSnapshot stateMetaInfoSnapshot) { + ColumnFamilyHandle columnFamilyHandle, + StateMetaInfoSnapshot stateMetaInfoSnapshot, + ICloseableRegistry cancelStreamRegistryForRestore) { ForStOperationUtils.ForStKvStateInfo registeredStateMetaInfoEntry = kvStateInformation.get(stateMetaInfoSnapshot.getName()); @@ -126,14 +144,19 @@ ForStOperationUtils.ForStKvStateInfo getOrRegisterStateColumnFamilyHandle( RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot); if (columnFamilyHandle == null) { registeredStateMetaInfoEntry = - ForStOperationUtils.createStateInfo( - stateMetaInfo, db, columnFamilyOptionsFactory); + createStateInfo( + stateMetaInfo, + db, + columnFamilyOptionsFactory, + ttlCompactFiltersManager, + writeBufferManagerCapacity, + cancelStreamRegistryForRestore); } else { registeredStateMetaInfoEntry = new ForStOperationUtils.ForStKvStateInfo(columnFamilyHandle, stateMetaInfo); } - ForStOperationUtils.registerKvStateInformation( + registerKvStateInformation( kvStateInformation, nativeMetricMonitor, stateMetaInfoSnapshot.getName(), @@ -146,6 +169,40 @@ ForStOperationUtils.ForStKvStateInfo getOrRegisterStateColumnFamilyHandle( return registeredStateMetaInfoEntry; } + /** + * Registers a new column family and imports data from the given export. + * + * @param stateMetaInfoKey info about the state to create. + * @param cfMetaDataList the data to import. + */ + void registerStateColumnFamilyHandleWithImport( + RegisteredStateMetaInfoBase.Key stateMetaInfoKey, + List cfMetaDataList, + ICloseableRegistry cancelStreamRegistryForRestore) { + + RegisteredStateMetaInfoBase stateMetaInfo = + stateMetaInfoKey.getRegisteredStateMetaInfoBase(); + + Preconditions.checkState( + !kvStateInformation.containsKey(stateMetaInfo.getName()), + "Error: stateMetaInfo.name is not unique:" + stateMetaInfo.getName()); + + ForStOperationUtils.ForStKvStateInfo stateInfo = + createStateInfo( + stateMetaInfo, + db, + columnFamilyOptionsFactory, + ttlCompactFiltersManager, + writeBufferManagerCapacity, + cfMetaDataList, + cancelStreamRegistryForRestore); + + registerKvStateInformation( + kvStateInformation, nativeMetricMonitor, stateMetaInfo.getName(), stateInfo); + + columnFamilyHandles.add(stateInfo.columnFamilyHandle); + } + public RocksDB getDb() { return db; } @@ -155,6 +212,10 @@ public ForStNativeMetricMonitor getNativeMetricMonitor() { return nativeMetricMonitor; } + public List getColumnFamilyHandles() { + return columnFamilyHandles; + } + public ColumnFamilyHandle getDefaultColumnFamilyHandle() { return defaultColumnFamilyHandle; } @@ -163,6 +224,18 @@ public Function getColumnFamilyOptionsFactory() { return columnFamilyOptionsFactory; } + public ForStDBTtlCompactFiltersManager getTtlCompactFiltersManager() { + return ttlCompactFiltersManager; + } + + public Long getWriteBufferManagerCapacity() { + return writeBufferManagerCapacity; + } + + public DBOptions getDbOptions() { + return dbOptions; + } + @Override public void close() { IOUtils.closeQuietly(defaultColumnFamilyHandle); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHeapTimersFullRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHeapTimersFullRestoreOperation.java index 74d46707e0331..1b8bc4145810a 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHeapTimersFullRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHeapTimersFullRestoreOperation.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.restore.KeyGroupEntry; import org.apache.flink.runtime.state.restore.SavepointRestoreResult; import org.apache.flink.runtime.state.restore.ThrowingIterator; +import org.apache.flink.state.forst.ForStDBTtlCompactFiltersManager; import org.apache.flink.state.forst.ForStDBWriteBatchWrapper; import org.apache.flink.state.forst.ForStNativeMetricOptions; import org.apache.flink.state.forst.ForStOperationUtils; @@ -51,6 +52,7 @@ import org.forstdb.DBOptions; import org.forstdb.RocksDBException; +import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import java.io.Closeable; @@ -90,6 +92,9 @@ public ForStHeapTimersFullRestoreOperation( Function columnFamilyOptionsFactory, ForStNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, + @Nonnull ForStDBTtlCompactFiltersManager ttlCompactFiltersManager, + @Nonnegative long writeBatchSize, + Long writeBufferManagerCapacity, @Nonnull Collection restoreStateHandles, ICloseableRegistry cancelStreamRegistryForRestore) { this.rocksHandle = @@ -99,7 +104,9 @@ public ForStHeapTimersFullRestoreOperation( dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, - metricGroup); + metricGroup, + ttlCompactFiltersManager, + writeBufferManagerCapacity); this.savepointRestoreOperation = new FullSnapshotRestoreOperation<>( keyGroupRange, @@ -157,7 +164,7 @@ private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult) } else { ForStOperationUtils.ForStKvStateInfo registeredStateCFHandle = this.rocksHandle.getOrRegisterStateColumnFamilyHandle( - null, restoredMetaInfo); + null, restoredMetaInfo, cancelStreamRegistryForRestore); columnFamilyHandles.put(i, registeredStateCFHandle.columnFamilyHandle); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java index 620385856941d..d639154f4b2fa 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java @@ -26,6 +26,8 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.BackendBuildingException; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; @@ -36,28 +38,43 @@ import org.apache.flink.runtime.state.StateSerializerProvider; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.state.forst.ForStDBTtlCompactFiltersManager; +import org.apache.flink.state.forst.ForStDBWriteBatchWrapper; +import org.apache.flink.state.forst.ForStIncrementalCheckpointUtils; import org.apache.flink.state.forst.ForStNativeMetricOptions; import org.apache.flink.state.forst.ForStOperationUtils; import org.apache.flink.state.forst.ForStResourceContainer; import org.apache.flink.state.forst.ForStStateDataTransfer; import org.apache.flink.state.forst.StateHandleTransferSpec; +import org.apache.flink.state.forst.sync.ForStIteratorWrapper; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.StateMigrationException; import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.function.RunnableWithException; import org.forstdb.ColumnFamilyDescriptor; +import org.forstdb.ColumnFamilyHandle; import org.forstdb.ColumnFamilyOptions; import org.forstdb.DBOptions; +import org.forstdb.ExportImportFilesMetaData; +import org.forstdb.ReadOptions; +import org.forstdb.RocksDB; +import org.forstdb.RocksDBException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnegative; import javax.annotation.Nonnull; +import java.io.Closeable; +import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -69,6 +86,12 @@ import static org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION; import static org.apache.flink.runtime.metrics.MetricNames.RESTORE_STATE_DURATION; +import static org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.beforeThePrefixBytes; +import static org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange; +import static org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.clipDBWithKeyGroupRange; +import static org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.exportColumnFamilies; +import static org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.findTheBestStateHandleForInitial; +import static org.apache.flink.state.forst.ForStOperationUtils.createColumnFamilyOptions; import static org.apache.flink.state.forst.ForStResourceContainer.DB_DIR_STRING; /** Encapsulates the process of restoring a ForSt instance from an incremental snapshot. */ @@ -80,15 +103,25 @@ public class ForStIncrementalRestoreOperation implements ForStRestoreOperatio private final String operatorIdentifier; private final SortedMap> restoredSstFiles; private final ForStHandle forstHandle; - private final Collection restoreStateHandles; + private final List restoreStateHandles; private final CloseableRegistry cancelStreamRegistry; private final KeyGroupRange keyGroupRange; + private final int keyGroupPrefixBytes; private final ForStResourceContainer optionsContainer; private final Path forstBasePath; private final StateSerializerProvider keySerializerProvider; private final ClassLoader userCodeClassLoader; private final CustomInitializationMetrics customInitializationMetrics; private long lastCompletedCheckpointId; + + private final long writeBatchSize; + + private final double overlapFractionThreshold; + + private final boolean useIngestDbRestoreMode; + + private final boolean useDeleteFilesInRange; + private UUID backendUID; private boolean isKeySerializerCompatibilityChecked; @@ -96,6 +129,7 @@ public class ForStIncrementalRestoreOperation implements ForStRestoreOperatio public ForStIncrementalRestoreOperation( String operatorIdentifier, KeyGroupRange keyGroupRange, + int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, ClassLoader userCodeClassLoader, Map kvStateInformation, @@ -107,8 +141,14 @@ public ForStIncrementalRestoreOperation( Function columnFamilyOptionsFactory, ForStNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, + @Nonnull ForStDBTtlCompactFiltersManager ttlCompactFiltersManager, + @Nonnegative long writeBatchSize, + Long writeBufferManagerCapacity, CustomInitializationMetrics customInitializationMetrics, - @Nonnull Collection restoreStateHandles) { + @Nonnull Collection restoreStateHandles, + double overlapFractionThreshold, + boolean useIngestDbRestoreMode, + boolean useDeleteFilesInRange) { this.forstHandle = new ForStHandle( @@ -117,19 +157,26 @@ public ForStIncrementalRestoreOperation( dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, - metricGroup); + metricGroup, + ttlCompactFiltersManager, + writeBufferManagerCapacity); this.operatorIdentifier = operatorIdentifier; this.restoredSstFiles = new TreeMap<>(); this.lastCompletedCheckpointId = -1L; this.backendUID = UUID.randomUUID(); this.customInitializationMetrics = customInitializationMetrics; - this.restoreStateHandles = restoreStateHandles; + this.restoreStateHandles = restoreStateHandles.stream().collect(Collectors.toList()); this.cancelStreamRegistry = cancelStreamRegistry; this.keyGroupRange = keyGroupRange; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.optionsContainer = optionsContainer; this.forstBasePath = forstBasePath; this.keySerializerProvider = keySerializerProvider; this.userCodeClassLoader = userCodeClassLoader; + this.writeBatchSize = writeBatchSize; + this.overlapFractionThreshold = overlapFractionThreshold; + this.useIngestDbRestoreMode = useIngestDbRestoreMode; + this.useDeleteFilesInRange = useDeleteFilesInRange; } /** @@ -144,40 +191,40 @@ public ForStRestoreResult restore() throws Exception { } logger.info( - "Starting ForSt incremental recovery in operator {}, target key-group range {}. State Handles={}", + "Starting RocksDB incremental recovery in operator {} " + + "target key-group range {}. Use IngestDB={}, State Handles={}", operatorIdentifier, keyGroupRange.prettyPrintInterval(), + useIngestDbRestoreMode, restoreStateHandles); - List otherHandles = - new ArrayList<>(restoreStateHandles.size() - 1); - IncrementalRemoteKeyedStateHandle mainHandle = - chooseMainHandleAndCollectOthers(otherHandles); - - // Use default db directory name as in main db instance - StateHandleTransferSpec mainSpec = - new StateHandleTransferSpec(mainHandle, new Path(forstBasePath, DB_DIR_STRING)); - - List otherSpecs = - otherHandles.stream() - .map( - handle -> - new StateHandleTransferSpec( - handle, - new Path( - forstBasePath, - UUID.randomUUID().toString()))) - .collect(Collectors.toList()); + List toTransferSpecs = new ArrayList<>(); + int bestStateHandleForInit = -1; + if (!useIngestDbRestoreMode || restoreStateHandles.size() == 1) { + bestStateHandleForInit = + findTheBestStateHandleForInitial( + restoreStateHandles, keyGroupRange, overlapFractionThreshold); + toTransferSpecs.add( + new StateHandleTransferSpec( + restoreStateHandles.get(bestStateHandleForInit), + new Path(forstBasePath, DB_DIR_STRING))); + } + for (int i = 0; i < restoreStateHandles.size(); i++) { + if (i != bestStateHandleForInit) { + IncrementalRemoteKeyedStateHandle handle = restoreStateHandles.get(i); + toTransferSpecs.add( + new StateHandleTransferSpec( + handle, new Path(forstBasePath, UUID.randomUUID().toString()))); + } + } try { runAndReportDuration( - () -> transferAllStateHandles(mainSpec, otherSpecs), + () -> transferAllStateHandles(toTransferSpecs), // TODO: use new metric name, such as "TransferStateDurationMs" DOWNLOAD_STATE_DURATION); - runAndReportDuration( - () -> restoreFromTransferredHandles(mainSpec, otherSpecs), - RESTORE_STATE_DURATION); + runAndReportDuration(() -> innerRestore(toTransferSpecs), RESTORE_STATE_DURATION); return new ForStRestoreResult( this.forstHandle.getDb(), @@ -187,8 +234,11 @@ public ForStRestoreResult restore() throws Exception { backendUID, restoredSstFiles); } finally { + if (!useIngestDbRestoreMode || restoreStateHandles.size() == 1) { + toTransferSpecs.remove(0); + } // Delete the transfer destination quietly. - otherSpecs.stream() + toTransferSpecs.stream() .map(StateHandleTransferSpec::getTransferDestination) .forEach( dir -> { @@ -196,28 +246,13 @@ public ForStRestoreResult restore() throws Exception { FileSystem fs = dir.getFileSystem(); fs.delete(dir, true); } catch (IOException ignored) { - + logger.warn("Failed to delete transfer destination {}", dir); } }); } } - private IncrementalRemoteKeyedStateHandle chooseMainHandleAndCollectOthers( - final List otherHandlesCollector) { - - // TODO: When implementing rescale, refer to - // RocksDBIncrementalCheckpointUtils.findTheBestStateHandleForInitial to implement this - // method. Currently, it can be assumed that there is only one state handle to restore. - return restoreStateHandles.iterator().next(); - } - - private void transferAllStateHandles( - StateHandleTransferSpec mainSpecs, List otherSpecs) - throws Exception { - - // TODO: Now not support rescale, so now ignore otherSpecs. Before implement transfer - // otherSpecs, we may need reconsider the implementation of ForStFlinkFileSystem. - + private void transferAllStateHandles(List specs) throws Exception { FileSystem forStFs = optionsContainer.getFileSystem(); if (forStFs == null) { forStFs = FileSystem.getLocalFileSystem(); @@ -225,35 +260,61 @@ private void transferAllStateHandles( try (ForStStateDataTransfer transfer = new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs)) { - transfer.transferAllStateDataToDirectory( - Collections.singleton(mainSpecs), cancelStreamRegistry); + transfer.transferAllStateDataToDirectory(specs, cancelStreamRegistry); } } - private void restoreFromTransferredHandles( - StateHandleTransferSpec mainHandle, List temporaryHandles) - throws Exception { - - restoreFromMainTransferredHandle(mainHandle); - - mergeOtherTransferredHandles(temporaryHandles); + private void innerRestore(List stateHandles) throws Exception { + if (stateHandles.size() == 1) { + // This happens if we don't rescale and for some scale out scenarios. + initBaseDBFromSingleStateHandle(stateHandles.get(0)); + } else { + // This happens for all scale ins and some scale outs. + restoreFromMultipleStateHandles(stateHandles); + } } - private void restoreFromMainTransferredHandle(StateHandleTransferSpec mainHandle) + /** + * Initializes the base DB that we restore from a single local state handle. + * + * @param stateHandleSpec the state handle to restore the base DB from. + * @throws Exception on any error during restore. + */ + private void initBaseDBFromSingleStateHandle(StateHandleTransferSpec stateHandleSpec) throws Exception { - IncrementalRemoteKeyedStateHandle handle = mainHandle.getStateHandle(); + IncrementalRemoteKeyedStateHandle stateHandle = stateHandleSpec.getStateHandle(); + logger.info( + "Starting opening base ForSt instance in operator {} with target key-group range {} from state handle {}.", + operatorIdentifier, + keyGroupRange.prettyPrintInterval(), + stateHandleSpec); - restoreBaseDBFromMainHandle(handle); + // Restore base DB from selected initial handle + restoreBaseDBFromMainHandle(stateHandleSpec); + + KeyGroupRange stateHandleKeyGroupRange = stateHandle.getKeyGroupRange(); // Check if the key-groups range has changed. - if (Objects.equals(handle.getKeyGroupRange(), keyGroupRange)) { + if (Objects.equals(stateHandleKeyGroupRange, keyGroupRange)) { // This is the case if we didn't rescale, so we can restore all the info from the // previous backend instance (backend id and incremental checkpoint history). - restorePreviousIncrementalFilesStatus(handle); + restorePreviousIncrementalFilesStatus(stateHandle); } else { // If the key-groups don't match, this was a scale out, and we need to clip the // key-groups range of the db to the target range for this backend. - throw new UnsupportedOperationException("ForStateBackend not support rescale yet."); + try { + clipDBWithKeyGroupRange( + this.forstHandle.getDb(), + this.forstHandle.getColumnFamilyHandles(), + keyGroupRange, + stateHandleKeyGroupRange, + keyGroupPrefixBytes, + useDeleteFilesInRange); + } catch (RocksDBException e) { + String errMsg = "Failed to clip DB after initialization."; + logger.error(errMsg, e); + throw new BackendBuildingException(errMsg, e); + } } logger.info( "Finished opening base ForSt instance in operator {} with target key-group range {}.", @@ -261,23 +322,56 @@ private void restoreFromMainTransferredHandle(StateHandleTransferSpec mainHandle keyGroupRange.prettyPrintInterval()); } - private void mergeOtherTransferredHandles(List otherHandles) { - if (otherHandles != null && !otherHandles.isEmpty()) { - // TODO: implement rescale - throw new UnsupportedOperationException("ForStateBackend not support rescale yet."); + /** + * Initializes the base DB that we restore from a list of multiple local state handles. + * + * @param stateHandles the list of state handles to restore the base DB from. + * @throws Exception on any error during restore. + */ + private void restoreFromMultipleStateHandles(List stateHandles) + throws Exception { + + logger.info( + "Starting to restore backend with range {} in operator {} from multiple state handles {} with useIngestDbRestoreMode = {}.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier, + stateHandles, + useIngestDbRestoreMode); + + byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup( + keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); + + byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup( + keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); + + if (useIngestDbRestoreMode) { + // Optimized path for merging multiple handles with Ingest/Clip + mergeStateHandlesWithClipAndIngest( + stateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + } else { + // Optimized path for single handle and legacy path for merging multiple handles. + StateHandleTransferSpec baseSpec = stateHandles.remove(0); + mergeStateHandlesWithCopyFromTemporaryInstance( + baseSpec, stateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); } + logger.info( + "Completed restoring backend with range {} in operator {} from multiple state handles with useIngestDbRestoreMode = {}.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier, + useIngestDbRestoreMode); } - private void restoreBaseDBFromMainHandle(IncrementalRemoteKeyedStateHandle handle) - throws Exception { + private void restoreBaseDBFromMainHandle(StateHandleTransferSpec handleSpec) throws Exception { KeyedBackendSerializationProxy serializationProxy = - readMetaData(handle.getMetaDataStateHandle()); + readMetaData(handleSpec.getStateHandle().getMetaDataStateHandle()); List stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots(); - this.forstHandle.openDB( createColumnFamilyDescriptors(stateMetaInfoSnapshots, true), - stateMetaInfoSnapshots); + stateMetaInfoSnapshots, + cancelStreamRegistry); } /** @@ -317,8 +411,12 @@ private List createColumnFamilyDescriptors( ColumnFamilyDescriptor columnFamilyDescriptor = ForStOperationUtils.createColumnFamilyDescriptor( - metaInfoBase.getName(), - this.forstHandle.getColumnFamilyOptionsFactory()); + metaInfoBase, + this.forstHandle.getColumnFamilyOptionsFactory(), + registerTtlCompactFilter + ? this.forstHandle.getTtlCompactFiltersManager() + : null, + this.forstHandle.getWriteBufferManagerCapacity()); columnFamilyDescriptors.add(columnFamilyDescriptor); } @@ -390,4 +488,478 @@ KeyedBackendSerializationProxy readMetaData(DataInputView dataInputView) public void close() throws Exception { this.forstHandle.close(); } + + /** + * Helper method tp copy all data from an open temporary DB to the base DB. + * + * @param tmpRestoreDBInfo the temporary instance. + * @param writeBatchWrapper write batch wrapper for writes against the base DB. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any copy error. + */ + private void copyTempDbIntoBaseDb( + RestoredDBInstance tmpRestoreDBInfo, + ForStDBWriteBatchWrapper writeBatchWrapper, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) + throws Exception { + logger.debug( + "Starting copy of state handle {} for backend with range {} in operator {} to base DB using temporary instance.", + tmpRestoreDBInfo.srcStateHandle, + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + + List tmpColumnFamilyDescriptors = + tmpRestoreDBInfo.columnFamilyDescriptors; + List tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; + + // iterating only the requested descriptors automatically skips the default column family + // handle + for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) { + ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(descIdx); + + ColumnFamilyHandle targetColumnFamilyHandle = + this.forstHandle.getOrRegisterStateColumnFamilyHandle( + null, + tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx), + cancelStreamRegistry) + .columnFamilyHandle; + + try (ForStIteratorWrapper iterator = + ForStOperationUtils.getForStIterator( + tmpRestoreDBInfo.db, + tmpColumnFamilyHandle, + tmpRestoreDBInfo.readOptions)) { + + iterator.seek(startKeyGroupPrefixBytes); + + while (iterator.isValid()) { + + if (beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) { + writeBatchWrapper.put( + targetColumnFamilyHandle, iterator.key(), iterator.value()); + } else { + // Since the iterator will visit the record according to the sorted order, + // we can just break here. + break; + } + + iterator.next(); + } + } // releases native iterator resources + } + logger.debug( + "Finished copy of state handle {} for backend with range {} in operator {} using temporary instance.", + tmpRestoreDBInfo.srcStateHandle, + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + } + + private void cleanUpPathQuietly(@Nonnull Path path) { + try { + path.getFileSystem().delete(path, true); + } catch (IOException ex) { + logger.warn("Failed to clean up path " + path, ex); + } + } + + /** + * Helper method to copy all data from the given local state handles to the base DB by using + * temporary DB instances. + * + * @param toImportSpecs the state handles to import. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any copy error. + */ + private void copyToBaseDBUsingTempDBs( + List toImportSpecs, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) + throws Exception { + if (toImportSpecs.isEmpty()) { + return; + } + + logger.info( + "Starting to copy state handles for backend with range {} in operator {} using temporary instances.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + + try (ForStDBWriteBatchWrapper writeBatchWrapper = + new ForStDBWriteBatchWrapper(this.forstHandle.getDb(), writeBatchSize); + Closeable ignored = + cancelStreamRegistry.registerCloseableTemporarily( + writeBatchWrapper.getCancelCloseable())) { + for (StateHandleTransferSpec handleToCopy : toImportSpecs) { + try (RestoredDBInstance restoredDBInstance = restoreTempDBInstance(handleToCopy)) { + copyTempDbIntoBaseDb( + restoredDBInstance, + writeBatchWrapper, + startKeyGroupPrefixBytes, + stopKeyGroupPrefixBytes); + } + } + } + + logger.info( + "Competed copying state handles for backend with range {} in operator {} using temporary instances.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + } + + /** + * Helper method that merges the data from multiple state handles into the restoring base DB by + * the help of copying through temporary RocksDB instances. + * + * @param keyedStateHandles the state handles to merge into the base DB. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any merge error. + */ + private void mergeStateHandlesWithCopyFromTemporaryInstance( + StateHandleTransferSpec baseSpec, + List keyedStateHandles, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) + throws Exception { + + logger.info( + "Starting to merge state for backend with range {} in operator {} from multiple state handles using temporary instances.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + + // Init the base DB instance with the initial state + initBaseDBFromSingleStateHandle(baseSpec); + + // Copy remaining handles using temporary RocksDB instances + copyToBaseDBUsingTempDBs( + keyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + + logger.info( + "Completed merging state for backend with range {} in operator {} from multiple state handles using temporary instances.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + } + + // --------------------------------------------------------------------------- + // ingestDB related + // --------------------------------------------------------------------------- + + /** + * Restores the base DB by merging multiple state handles into one. This method first checks if + * all data to import is in the expected key-groups range and then uses import/export. + * Otherwise, this method falls back to copying the data using a temporary DB. + * + * @param keyedStateHandles the list of state handles to restore the base DB from. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any restore error. + */ + public void mergeStateHandlesWithClipAndIngest( + List keyedStateHandles, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) + throws Exception { + + Path exportCfBasePath = new Path(forstBasePath.toString(), "export-cfs"); + forstBasePath.getFileSystem().mkdirs(exportCfBasePath); + + final Map> + exportedColumnFamilyMetaData = new HashMap<>(keyedStateHandles.size()); + + final List notImportableHandles = + new ArrayList<>(keyedStateHandles.size()); + try { + KeyGroupRange exportedSstKeyGroupsRange = + exportColumnFamiliesWithSstDataInKeyGroupsRange( + new File( + new URI( + exportCfBasePath.getFileSystem().getUri() + + exportCfBasePath.getPath())) + .toPath(), + keyedStateHandles, + exportedColumnFamilyMetaData, + notImportableHandles); + + if (exportedColumnFamilyMetaData.isEmpty()) { + // Nothing coule be exported, so we fall back to + // #mergeStateHandlesWithCopyFromTemporaryInstance + int bestStateHandleForInit = + findTheBestStateHandleForInitial( + restoreStateHandles, keyGroupRange, overlapFractionThreshold); + notImportableHandles.remove(bestStateHandleForInit); + StateHandleTransferSpec baseSpec = + new StateHandleTransferSpec( + restoreStateHandles.get(bestStateHandleForInit), + new Path(forstBasePath, DB_DIR_STRING)); + transferAllStateHandles(Collections.singletonList(baseSpec)); + mergeStateHandlesWithCopyFromTemporaryInstance( + baseSpec, + notImportableHandles, + startKeyGroupPrefixBytes, + stopKeyGroupPrefixBytes); + } else { + // We initialize the base DB by importing all the exported data. + initBaseDBFromColumnFamilyImports( + exportedColumnFamilyMetaData, exportedSstKeyGroupsRange); + // Copy data from handles that we couldn't directly import using temporary + // instances. + copyToBaseDBUsingTempDBs( + notImportableHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + } + } finally { + // Close native RocksDB objects + exportedColumnFamilyMetaData.values().forEach(IOUtils::closeAllQuietly); + // Cleanup export base directory + cleanUpPathQuietly(exportCfBasePath); + } + } + + /** + * Initializes the base DB by importing from previously exported data. + * + * @param exportedColumnFamilyMetaData the export (meta) data. + * @param exportKeyGroupRange the total key-groups range of the exported data. + * @throws Exception on import error. + */ + private void initBaseDBFromColumnFamilyImports( + Map> + exportedColumnFamilyMetaData, + KeyGroupRange exportKeyGroupRange) + throws Exception { + + // We initialize the base DB by importing all the exported data. + logger.info( + "Starting to import exported state handles for backend with range {} in operator {} using Clip/Ingest DB with exported range {}.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier, + exportKeyGroupRange.prettyPrintInterval()); + forstHandle.openDB(); + for (Map.Entry> entry : + exportedColumnFamilyMetaData.entrySet()) { + forstHandle.registerStateColumnFamilyHandleWithImport( + entry.getKey(), entry.getValue(), cancelStreamRegistry); + } + + // Use Range delete to clip the temp db to the target range of the backend + clipDBWithKeyGroupRange( + forstHandle.getDb(), + forstHandle.getColumnFamilyHandles(), + keyGroupRange, + exportKeyGroupRange, + keyGroupPrefixBytes, + useDeleteFilesInRange); + + logger.info( + "Completed importing exported state handles for backend with range {} in operator {} using Clip/Ingest DB.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + } + + /** + * Prepares the data for importing by exporting from temporary RocksDB instances. We can only + * import data that does not exceed the target key-groups range and skip state handles that + * exceed their range. + * + * @param exportCfBasePath the base path for the export files. + * @param stateHandleSpecs the state handles to prepare for import. + * @param exportedColumnFamiliesOut output parameter for the metadata of completed exports. + * @param skipped output parameter for state handles that could not be exported because the data + * exceeded the proclaimed range. + * @return the total key-groups range of the exported data. + * @throws Exception on any export error. + */ + private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange( + java.nio.file.Path exportCfBasePath, + List stateHandleSpecs, + Map> + exportedColumnFamiliesOut, + List skipped) + throws Exception { + + logger.info( + "Starting restore export for backend with range {} in operator {}.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + + int minExportKeyGroup = Integer.MAX_VALUE; + int maxExportKeyGroup = Integer.MIN_VALUE; + int index = 0; + for (StateHandleTransferSpec stateHandleSpec : stateHandleSpecs) { + IncrementalRemoteKeyedStateHandle stateHandle = stateHandleSpec.getStateHandle(); + + final String logLineSuffix = + " for state handle at index " + + index + + " with proclaimed key-group range " + + stateHandle.getKeyGroupRange().prettyPrintInterval() + + " for backend with range " + + keyGroupRange.prettyPrintInterval() + + " in operator " + + operatorIdentifier + + "."; + + logger.debug("Opening temporary database" + logLineSuffix); + try (ForStIncrementalRestoreOperation.RestoredDBInstance tmpRestoreDBInfo = + restoreTempDBInstance(stateHandleSpec)) { + + List tmpColumnFamilyHandles = + tmpRestoreDBInfo.columnFamilyHandles; + + logger.debug("Checking actual keys of sst files" + logLineSuffix); + + // Check if the data in all SST files referenced in the handle is within the + // proclaimed key-groups range of the handle. + ForStIncrementalCheckpointUtils.RangeCheckResult rangeCheckResult = + checkSstDataAgainstKeyGroupRange( + tmpRestoreDBInfo.db, + keyGroupPrefixBytes, + stateHandle.getKeyGroupRange()); + + logger.info("{}" + logLineSuffix, rangeCheckResult); + + if (rangeCheckResult.allInRange()) { + + logger.debug("Start exporting" + logLineSuffix); + + List registeredStateMetaInfoBases = + tmpRestoreDBInfo.stateMetaInfoSnapshots.stream() + .map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot) + .collect(Collectors.toList()); + + // Export all the Column Families and store the result in + // exportedColumnFamiliesOut + exportColumnFamilies( + tmpRestoreDBInfo.db, + tmpColumnFamilyHandles, + registeredStateMetaInfoBases, + exportCfBasePath, + exportedColumnFamiliesOut); + + minExportKeyGroup = + Math.min( + minExportKeyGroup, + stateHandle.getKeyGroupRange().getStartKeyGroup()); + maxExportKeyGroup = + Math.max( + maxExportKeyGroup, + stateHandle.getKeyGroupRange().getEndKeyGroup()); + + logger.debug("Done exporting" + logLineSuffix); + } else { + // Actual key range in files exceeds proclaimed range, cannot import. We + // will copy this handle using a temporary DB later. + skipped.add(stateHandleSpec); + logger.debug("Skipped export" + logLineSuffix); + } + } + ++index; + } + + KeyGroupRange exportedKeyGroupsRange = + minExportKeyGroup <= maxExportKeyGroup + ? new KeyGroupRange(minExportKeyGroup, maxExportKeyGroup) + : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; + + logger.info( + "Completed restore export for backend with range {} in operator {}. {} exported handles with overall exported range {}. {} Skipped handles: {}.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier, + stateHandleSpecs.size() - skipped.size(), + exportedKeyGroupsRange.prettyPrintInterval(), + skipped.size(), + skipped); + + return exportedKeyGroupsRange; + } + + private RestoredDBInstance restoreTempDBInstance(StateHandleTransferSpec stateHandleSpec) + throws Exception { + KeyedBackendSerializationProxy serializationProxy = + readMetaData(stateHandleSpec.getStateHandle().getMetaDataStateHandle()); + // read meta data + List stateMetaInfoSnapshots = + serializationProxy.getStateMetaInfoSnapshots(); + + List columnFamilyDescriptors = + createColumnFamilyDescriptors(stateMetaInfoSnapshots, false); + + List columnFamilyHandles = + new ArrayList<>(stateMetaInfoSnapshots.size() + 1); + + String dbName = + optionsContainer.getRemoteBasePath() != null + ? "/" + stateHandleSpec.getTransferDestination().getName() + : stateHandleSpec.getTransferDestination().toString(); + + RocksDB restoreDb = + ForStOperationUtils.openDB( + dbName, + columnFamilyDescriptors, + columnFamilyHandles, + createColumnFamilyOptions( + this.forstHandle.getColumnFamilyOptionsFactory(), "default"), + this.forstHandle.getDbOptions()); + + return new RestoredDBInstance( + restoreDb, + columnFamilyHandles, + columnFamilyDescriptors, + stateMetaInfoSnapshots, + stateHandleSpec.getStateHandle(), + stateHandleSpec.getTransferDestination().toString()); + } + + /** Entity to hold the temporary RocksDB instance created for restore. */ + public static class RestoredDBInstance implements AutoCloseable { + + @Nonnull final RocksDB db; + + @Nonnull final ColumnFamilyHandle defaultColumnFamilyHandle; + + @Nonnull final List columnFamilyHandles; + + @Nonnull final List columnFamilyDescriptors; + + @Nonnull final List stateMetaInfoSnapshots; + + final ReadOptions readOptions; + + final IncrementalRemoteKeyedStateHandle srcStateHandle; + + final String instancePath; + + RestoredDBInstance( + @Nonnull RocksDB db, + @Nonnull List columnFamilyHandles, + @Nonnull List columnFamilyDescriptors, + @Nonnull List stateMetaInfoSnapshots, + @Nonnull IncrementalRemoteKeyedStateHandle srcStateHandle, + @Nonnull String instancePath) { + this.db = db; + this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0); + this.columnFamilyHandles = columnFamilyHandles; + this.columnFamilyDescriptors = columnFamilyDescriptors; + this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; + this.readOptions = new ReadOptions(); + this.srcStateHandle = srcStateHandle; + this.instancePath = instancePath; + } + + @Override + public void close() { + List columnFamilyOptions = + new ArrayList<>(columnFamilyDescriptors.size() + 1); + columnFamilyDescriptors.forEach((cfd) -> columnFamilyOptions.add(cfd.getOptions())); + ForStOperationUtils.addColumnFamilyOptionsToCloseLater( + columnFamilyOptions, defaultColumnFamilyHandle); + IOUtils.closeQuietly(defaultColumnFamilyHandle); + IOUtils.closeAllQuietly(columnFamilyHandles); + IOUtils.closeQuietly(db); + IOUtils.closeAllQuietly(columnFamilyOptions); + IOUtils.closeQuietly(readOptions); + } + } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStNoneRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStNoneRestoreOperation.java index 372e885af685a..18ac1dabb2661 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStNoneRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStNoneRestoreOperation.java @@ -20,12 +20,16 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.state.forst.ForStDBTtlCompactFiltersManager; import org.apache.flink.state.forst.ForStNativeMetricOptions; import org.apache.flink.state.forst.ForStOperationUtils; import org.forstdb.ColumnFamilyOptions; import org.forstdb.DBOptions; +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + import java.util.Map; import java.util.function.Function; @@ -39,7 +43,10 @@ public ForStNoneRestoreOperation( DBOptions dbOptions, Function columnFamilyOptionsFactory, ForStNativeMetricOptions nativeMetricOptions, - MetricGroup metricGroup) { + MetricGroup metricGroup, + @Nonnull ForStDBTtlCompactFiltersManager ttlCompactFiltersManager, + @Nonnegative long writeBatchSize, + Long writeBufferManagerCapacity) { this.rocksHandle = new ForStHandle( kvStateInformation, @@ -47,7 +54,9 @@ public ForStNoneRestoreOperation( dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, - metricGroup); + metricGroup, + ttlCompactFiltersManager, + writeBufferManagerCapacity); } @Override diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java index 16dea8a9548a9..a88e27512d5d7 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java @@ -90,6 +90,9 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import static org.apache.flink.state.forst.ForStConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD; +import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING; +import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.util.Preconditions.checkArgument; /** @@ -100,7 +103,7 @@ */ public class ForStSyncKeyedStateBackendBuilder extends AbstractKeyedStateBackendBuilder { - static final String DB_INSTANCE_DIR_STRING = "db"; + private static final String DB_INSTANCE_DIR_STRING = "db"; /** String that identifies the operator that owns this backend. */ private final String operatorIdentifier; @@ -132,17 +135,16 @@ public class ForStSyncKeyedStateBackendBuilder extends AbstractKeyedStateBack /** RocksDB property-based and statistics-based native metrics options. */ private ForStNativeMetricOptions nativeMetricOptions; - private int numberOfTransferingThreads; private long writeBatchSize = ForStConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes(); private RocksDB injectedTestDB; // for testing - // todo: checkpoint/restore related private boolean incrementalRestoreAsyncCompactAfterRescale = false; - private boolean rescalingUseDeleteFilesInRange = false; - private double overlapFractionThreshold = 0.5; - private boolean useIngestDbRestoreMode = false; + private double overlapFractionThreshold = RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue(); + private boolean useIngestDbRestoreMode = USE_INGEST_DB_RESTORE_MODE.defaultValue(); + private boolean rescalingUseDeleteFilesInRange = + USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue(); private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for testing private AsyncExceptionHandler asyncExceptionHandler; @@ -166,7 +168,6 @@ public ForStSyncKeyedStateBackendBuilder( @Nonnull Collection stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, CloseableRegistry cancelStreamRegistry) { - super( kvStateRegistry, keySerializer, @@ -192,7 +193,6 @@ public ForStSyncKeyedStateBackendBuilder( this.customInitializationMetrics = customInitializationMetrics; this.enableIncrementalCheckpointing = false; this.nativeMetricOptions = new ForStNativeMetricOptions(); - this.numberOfTransferingThreads = 4; } @VisibleForTesting @@ -304,7 +304,12 @@ public ForStSyncKeyedStateBackend build() throws BackendBuildingException { SortedMap> materializedSstFiles = new TreeMap<>(); long lastCompletedCheckpointId = -1L; prepareDirectories(); - restoreOperation = getForStDBRestoreOperation(kvStateInformation, registeredPQStates); + restoreOperation = + getForStDBRestoreOperation( + keyGroupPrefixBytes, + kvStateInformation, + registeredPQStates, + ttlCompactFiltersManager); ForStRestoreResult restoreResult = restoreOperation.restore(); db = restoreResult.getDb(); defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle(); @@ -415,6 +420,24 @@ public ForStSyncKeyedStateBackend build() throws BackendBuildingException { asyncCompactAfterRestoreFuture); } + public ForStSyncKeyedStateBackendBuilder setOverlapFractionThreshold( + double overlapFractionThreshold) { + this.overlapFractionThreshold = overlapFractionThreshold; + return this; + } + + public ForStSyncKeyedStateBackendBuilder setUseIngestDbRestoreMode( + boolean useIngestDbRestoreMode) { + this.useIngestDbRestoreMode = useIngestDbRestoreMode; + return this; + } + + public ForStSyncKeyedStateBackendBuilder setRescalingUseDeleteFilesInRange( + boolean rescalingUseDeleteFilesInRange) { + this.rescalingUseDeleteFilesInRange = rescalingUseDeleteFilesInRange; + return this; + } + private ForStSnapshotStrategyBase initializeSnapshotStrategy( @Nonnull RocksDB db, @Nonnull ResourceGuard forstResourceGuard, @@ -431,9 +454,7 @@ public ForStSyncKeyedStateBackend build() throws BackendBuildingException { ForStFlinkFileSystem forStFs = optionsContainer.getRemoteForStPath() != null - ? (ForStFlinkFileSystem) - ForStFlinkFileSystem.get( - optionsContainer.getRemoteForStPath().toUri()) + ? ForStFlinkFileSystem.get(optionsContainer.getRemoteForStPath().toUri()) : null; ForStStateDataTransfer stateTransfer = new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs); @@ -466,8 +487,10 @@ public ForStSyncKeyedStateBackend build() throws BackendBuildingException { } private ForStRestoreOperation getForStDBRestoreOperation( + int keyGroupPrefixBytes, LinkedHashMap kvStateInformation, - LinkedHashMap> registeredPQStates) { + LinkedHashMap> registeredPQStates, + ForStDBTtlCompactFiltersManager ttlCompactFiltersManager) { // Currently, ForStDB does not support mixing local-dir and remote-dir, and ForStDB will // concatenates the dfs directory with the local directory as working dir when using flink // env. We expect to directly use the dfs directory in flink env or local directory as @@ -481,13 +504,17 @@ private ForStRestoreOperation getForStDBRestoreOperation( optionsContainer.getDbOptions(), columnFamilyOptionsFactory, nativeMetricOptions, - metricGroup); + metricGroup, + ttlCompactFiltersManager, + writeBatchSize, + optionsContainer.getWriteBufferManagerCapacity()); } KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next(); if (firstStateHandle instanceof IncrementalRemoteKeyedStateHandle) { return new ForStIncrementalRestoreOperation<>( operatorIdentifier, keyGroupRange, + keyGroupPrefixBytes, cancelStreamRegistry, userCodeClassLoader, kvStateInformation, @@ -499,9 +526,15 @@ private ForStRestoreOperation getForStDBRestoreOperation( columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, + ttlCompactFiltersManager, + writeBatchSize, + optionsContainer.getWriteBufferManagerCapacity(), customInitializationMetrics, CollectionUtil.checkedSubTypeCast( - restoreStateHandles, IncrementalRemoteKeyedStateHandle.class)); + restoreStateHandles, IncrementalRemoteKeyedStateHandle.class), + overlapFractionThreshold, + useIngestDbRestoreMode, + rescalingUseDeleteFilesInRange); } else if (priorityQueueConfig.getPriorityQueueStateType() == ForStStateBackend.PriorityQueueStateType.HEAP) { // Note: This branch can be touched after ForSt Support canonical savepoint, @@ -519,6 +552,9 @@ private ForStRestoreOperation getForStDBRestoreOperation( columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, + ttlCompactFiltersManager, + writeBatchSize, + optionsContainer.getWriteBufferManagerCapacity(), restoreStateHandles, cancelStreamRegistry); } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStIncrementalCheckpointRescalingTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStIncrementalCheckpointRescalingTest.java new file mode 100644 index 0000000000000..94b7186dcf8e3 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStIncrementalCheckpointRescalingTest.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; + +/** Tests to guard rescaling from checkpoint. */ +@RunWith(Parameterized.class) +public class ForStIncrementalCheckpointRescalingTest extends TestLogger { + + @Rule public TemporaryFolder rootFolder = new TemporaryFolder(); + + @Parameterized.Parameters(name = "useIngestDbRestoreMode: {0}") + public static Collection parameters() { + return Arrays.asList(false, true); + } + + @Parameterized.Parameter public boolean useIngestDbRestoreMode; + + private final int maxParallelism = 10; + + private final KeySelector keySelector = new TestKeySelector(); + + private String[] records; + + @Before + public void initRecords() throws Exception { + records = new String[10]; + records[0] = "8"; + Assert.assertEquals( + 0, + KeyGroupRangeAssignment.assignToKeyGroup( + keySelector.getKey(records[0]), maxParallelism)); // group 0 + + records[1] = "5"; + Assert.assertEquals( + 1, + KeyGroupRangeAssignment.assignToKeyGroup( + keySelector.getKey(records[1]), maxParallelism)); // group 1 + + records[2] = "25"; + Assert.assertEquals( + 2, + KeyGroupRangeAssignment.assignToKeyGroup( + keySelector.getKey(records[2]), maxParallelism)); // group 2 + + records[3] = "13"; + Assert.assertEquals( + 3, + KeyGroupRangeAssignment.assignToKeyGroup( + keySelector.getKey(records[3]), maxParallelism)); // group 3 + + records[4] = "4"; + Assert.assertEquals( + 4, + KeyGroupRangeAssignment.assignToKeyGroup( + keySelector.getKey(records[4]), maxParallelism)); // group 4 + + records[5] = "7"; + Assert.assertEquals( + 5, + KeyGroupRangeAssignment.assignToKeyGroup( + keySelector.getKey(records[5]), maxParallelism)); // group 5 + + records[6] = "1"; + Assert.assertEquals( + 6, + KeyGroupRangeAssignment.assignToKeyGroup( + keySelector.getKey(records[6]), maxParallelism)); // group 6 + + records[7] = "6"; + Assert.assertEquals( + 7, + KeyGroupRangeAssignment.assignToKeyGroup( + keySelector.getKey(records[7]), maxParallelism)); // group 7 + + records[8] = "9"; + Assert.assertEquals( + 8, + KeyGroupRangeAssignment.assignToKeyGroup( + keySelector.getKey(records[8]), maxParallelism)); // group 8 + + records[9] = "3"; + Assert.assertEquals( + 9, + KeyGroupRangeAssignment.assignToKeyGroup( + keySelector.getKey(records[9]), maxParallelism)); // group 9 + } + + @Test + @SuppressWarnings("unchecked") + public void testScalingUp() throws Exception { + + // -----------------------------------------> test with initial parallelism 1 + // <--------------------------------------- + + OperatorSubtaskState snapshot; + + try (KeyedOneInputStreamOperatorTestHarness harness = + getHarnessTest(keySelector, maxParallelism, 1, 0)) { + harness.setStateBackend(getStateBackend()); + harness.setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness.open(); + + validHarnessResult(harness, 1, records); + + snapshot = harness.snapshot(0, 0); + } + + // -----------------------------------------> test rescaling from 1 to 2 + // <--------------------------------------- + + // init state for new subtask-0 + OperatorSubtaskState initState1 = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + snapshot, maxParallelism, 1, 2, 0); + + // init state for new subtask-1 + OperatorSubtaskState initState2 = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + snapshot, maxParallelism, 1, 2, 1); + + KeyedOneInputStreamOperatorTestHarness[] harness2 = + new KeyedOneInputStreamOperatorTestHarness[3]; + + OperatorSubtaskState snapshot2; + + try { + List keyGroupPartitions = + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism, 2); + + // task's key-group [0, 4] + KeyGroupRange localKeyGroupRange20 = keyGroupPartitions.get(0); + Assert.assertEquals(new KeyGroupRange(0, 4), localKeyGroupRange20); + harness2[0] = getHarnessTest(keySelector, maxParallelism, 2, 0); + harness2[0].setStateBackend(getStateBackend()); + harness2[0].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness2[0].setup(); + harness2[0].initializeState(initState1); + harness2[0].open(); + + // task's key-group [5, 9] + KeyGroupRange localKeyGroupRange21 = keyGroupPartitions.get(1); + Assert.assertEquals(new KeyGroupRange(5, 9), localKeyGroupRange21); + harness2[1] = getHarnessTest(keySelector, maxParallelism, 2, 1); + harness2[1].setStateBackend(getStateBackend()); + harness2[1].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness2[1].setup(); + harness2[1].initializeState(initState2); + harness2[1].open(); + + validHarnessResult( + harness2[0], 2, records[0], records[1], records[2], records[3], records[4]); + + validHarnessResult( + harness2[1], 2, records[5], records[6], records[7], records[8], records[9]); + + snapshot2 = + AbstractStreamOperatorTestHarness.repackageState( + harness2[0].snapshot(0, 0), harness2[1].snapshot(0, 0)); + } finally { + closeHarness(harness2); + } + + // -----------------------------------------> test rescaling from 2 to 3 + // <--------------------------------------- + + // init state for new subtask-0 + initState1 = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + snapshot2, maxParallelism, 2, 3, 0); + + // init state for new subtask-1 + initState2 = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + snapshot2, maxParallelism, 2, 3, 1); + + // init state for new subtask-2 + OperatorSubtaskState initState3 = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + snapshot2, maxParallelism, 2, 3, 2); + + KeyedOneInputStreamOperatorTestHarness[] harness3 = + new KeyedOneInputStreamOperatorTestHarness[3]; + + try { + List keyGroupPartitions = + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism, 3); + + // task's key-group [0, 3] + // this will choose the state handle to harness2[0] to init the target db with clipping. + KeyGroupRange localKeyGroupRange30 = keyGroupPartitions.get(0); + Assert.assertEquals(new KeyGroupRange(0, 3), localKeyGroupRange30); + harness3[0] = getHarnessTest(keySelector, maxParallelism, 3, 0); + harness3[0].setStateBackend(getStateBackend()); + harness3[0].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness3[0].setup(); + harness3[0].initializeState(initState1); + harness3[0].open(); + + // task's key-group [4, 6] + KeyGroupRange localKeyGroupRange31 = keyGroupPartitions.get(1); + Assert.assertEquals(new KeyGroupRange(4, 6), localKeyGroupRange31); + harness3[1] = getHarnessTest(keySelector, maxParallelism, 3, 1); + harness3[1].setStateBackend(getStateBackend()); + harness3[1].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness3[1].setup(); + harness3[1].initializeState(initState2); + harness3[1].open(); + + // task's key-group [7, 9] + KeyGroupRange localKeyGroupRange32 = keyGroupPartitions.get(2); + Assert.assertEquals(new KeyGroupRange(7, 9), localKeyGroupRange32); + harness3[2] = getHarnessTest(keySelector, maxParallelism, 3, 2); + harness3[2].setStateBackend(getStateBackend()); + harness3[2].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness3[2].setup(); + harness3[2].initializeState(initState3); + harness3[2].open(); + + validHarnessResult(harness3[0], 3, records[0], records[1], records[2], records[3]); + validHarnessResult(harness3[1], 3, records[4], records[5], records[6]); + validHarnessResult(harness3[2], 3, records[7], records[8], records[9]); + } finally { + closeHarness(harness3); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testScalingDown() throws Exception { + + // -----------------------------------------> test with initial parallelism 3 + // <--------------------------------------- + + KeyedOneInputStreamOperatorTestHarness[] harness3 = + new KeyedOneInputStreamOperatorTestHarness[3]; + OperatorSubtaskState snapshot3; + + try { + List keyGroupPartitions = + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism, 3); + + // task's key-group [0, 3], this should trigger the condition to use clip + KeyGroupRange localKeyGroupRange30 = keyGroupPartitions.get(0); + Assert.assertEquals(new KeyGroupRange(0, 3), localKeyGroupRange30); + harness3[0] = getHarnessTest(keySelector, maxParallelism, 3, 0); + harness3[0].setStateBackend(getStateBackend()); + harness3[0].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness3[0].open(); + + // task's key-group [4, 6] + KeyGroupRange localKeyGroupRange31 = keyGroupPartitions.get(1); + Assert.assertEquals(new KeyGroupRange(4, 6), localKeyGroupRange31); + harness3[1] = getHarnessTest(keySelector, maxParallelism, 3, 1); + harness3[1].setStateBackend(getStateBackend()); + harness3[1].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness3[1].open(); + + // task's key-group [7, 9] + KeyGroupRange localKeyGroupRange32 = keyGroupPartitions.get(2); + Assert.assertEquals(new KeyGroupRange(7, 9), localKeyGroupRange32); + harness3[2] = getHarnessTest(keySelector, maxParallelism, 3, 2); + harness3[2].setStateBackend(getStateBackend()); + harness3[2].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness3[2].open(); + + validHarnessResult(harness3[0], 1, records[0], records[1], records[2], records[3]); + validHarnessResult(harness3[1], 1, records[4], records[5], records[6]); + validHarnessResult(harness3[2], 1, records[7], records[8], records[9]); + + snapshot3 = + AbstractStreamOperatorTestHarness.repackageState( + harness3[0].snapshot(0, 0), + harness3[1].snapshot(0, 0), + harness3[2].snapshot(0, 0)); + + } finally { + closeHarness(harness3); + } + + // -----------------------------------------> test rescaling from 3 to 2 + // <--------------------------------------- + + // init state for new subtask-0 + OperatorSubtaskState initState1 = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + snapshot3, maxParallelism, 3, 2, 0); + + // init state for new subtask-1 + OperatorSubtaskState initState2 = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + snapshot3, maxParallelism, 3, 2, 1); + + KeyedOneInputStreamOperatorTestHarness[] harness2 = + new KeyedOneInputStreamOperatorTestHarness[3]; + + OperatorSubtaskState snapshot2; + + try { + List keyGroupPartitions = + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism, 2); + + // task's key-group [0, 4] + // this will choose the state handle generated by harness3[0] to init the target db + // without any clipping. + KeyGroupRange localKeyGroupRange20 = keyGroupPartitions.get(0); + Assert.assertEquals(new KeyGroupRange(0, 4), localKeyGroupRange20); + harness2[0] = getHarnessTest(keySelector, maxParallelism, 2, 0); + harness2[0].setStateBackend(getStateBackend()); + harness2[0].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness2[0].setup(); + harness2[0].initializeState(initState1); + harness2[0].open(); + + // task's key-group [5, 9], this will open a empty db, and insert records from two state + // handles. + KeyGroupRange localKeyGroupRange21 = keyGroupPartitions.get(1); + Assert.assertEquals(new KeyGroupRange(5, 9), localKeyGroupRange21); + harness2[1] = getHarnessTest(keySelector, maxParallelism, 2, 1); + harness2[1].setStateBackend(getStateBackend()); + harness2[1].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness2[1].setup(); + harness2[1].initializeState(initState2); + harness2[1].open(); + + validHarnessResult( + harness2[0], 2, records[0], records[1], records[2], records[3], records[4]); + + validHarnessResult( + harness2[1], 2, records[5], records[6], records[7], records[8], records[9]); + + snapshot2 = + AbstractStreamOperatorTestHarness.repackageState( + harness2[0].snapshot(0, 0), harness2[1].snapshot(0, 0)); + } finally { + closeHarness(harness2); + } + + // -----------------------------------------> test rescaling from 2 to 1 + // <--------------------------------------- + + // init state for new subtask-0 + initState1 = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + snapshot2, maxParallelism, 2, 1, 0); + + try (KeyedOneInputStreamOperatorTestHarness harness = + getHarnessTest(keySelector, maxParallelism, 1, 0)) { + + // this will choose the state handle generated by harness2[0] to init the target db + // without any clipping. + harness.setStateBackend(getStateBackend()); + harness.setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); + harness.setup(); + harness.initializeState(initState1); + harness.open(); + + validHarnessResult(harness, 3, records); + } + } + + private void closeHarness(KeyedOneInputStreamOperatorTestHarness[] harnessArr) + throws Exception { + for (KeyedOneInputStreamOperatorTestHarness harness : harnessArr) { + if (harness != null) { + harness.close(); + } + } + } + + @SuppressWarnings("unchecked") + private void validHarnessResult( + KeyedOneInputStreamOperatorTestHarness harness, + Integer expectedValue, + String... records) + throws Exception { + for (String record : records) { + harness.processElement(new StreamRecord<>(record, 1)); + StreamRecord outputRecord = (StreamRecord) harness.getOutput().poll(); + Assert.assertNotNull(outputRecord); + Assert.assertEquals(expectedValue, outputRecord.getValue()); + } + } + + private KeyedOneInputStreamOperatorTestHarness getHarnessTest( + KeySelector keySelector, + int maxParallelism, + int taskParallelism, + int subtaskIdx) + throws Exception { + return new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedProcessOperator<>(new TestKeyedFunction()), + keySelector, + BasicTypeInfo.STRING_TYPE_INFO, + maxParallelism, + taskParallelism, + subtaskIdx); + } + + private StateBackend getStateBackend() throws Exception { + ForStStateBackend forStStateBackend = new ForStStateBackend(); + Configuration configuration = new Configuration(); + configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDbRestoreMode); + return forStStateBackend.configure(configuration, getClass().getClassLoader()); + } + + /** A simple keyed function for tests. */ + private class TestKeyedFunction extends KeyedProcessFunction { + + private ValueState counterState; + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + counterState = + this.getRuntimeContext() + .getState(new ValueStateDescriptor<>("counter", Integer.class)); + } + + @Override + public void processElement(String value, Context ctx, Collector out) + throws Exception { + Integer oldCount = counterState.value(); + Integer newCount = oldCount != null ? oldCount + 1 : 1; + counterState.update(newCount); + out.collect(newCount); + } + } + + /** A simple key selector for tests. */ + private class TestKeySelector implements KeySelector { + @Override + public String getKey(String value) throws Exception { + return value; + } + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java index 14c4ed6cd660a..d2e4cb108b77c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java @@ -31,7 +31,6 @@ import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.function.SupplierWithException; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; @@ -39,10 +38,10 @@ import java.util.Arrays; import java.util.List; +import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES; /** Tests for the partitioned state part of {@link ForStStateBackendTest}. */ -@Disabled("ForStStateBackend is not support rescaling yet, some tests can't run") @ExtendWith(ParameterizedTestExtension.class) class ForStStateBackendTest extends StateBackendTestBase { @TempDir private static java.nio.file.Path tempFolder; @@ -79,8 +78,8 @@ protected ConfigurableStateBackend getStateBackend() throws Exception { ForStStateBackend backend = new ForStStateBackend(); Configuration config = new Configuration(); config.set(LOCAL_DIRECTORIES, tempFolder.toString()); - backend.configure(config, Thread.currentThread().getContextClassLoader()); - return new ForStStateBackend(); + config.set(USE_INGEST_DB_RESTORE_MODE, true); + return backend.configure(config, Thread.currentThread().getContextClassLoader()); } @Override diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTestV2.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTestV2.java index 413f81532ff89..9bb14be1639eb 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTestV2.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTestV2.java @@ -94,7 +94,8 @@ protected ConfigurableStateBackend getStateBackend() throws Exception { if (hasRemoteDir) { config.set(REMOTE_DIRECTORY, tempFolderForForstRemote.toString()); } - backend.configure(config, Thread.currentThread().getContextClassLoader()); - return new ForStStateBackend(); + // Async state + remote + ingestDB is not supported, because ingestDB needs to link files. + // config.set(USE_INGEST_DB_RESTORE_MODE, true); + return backend.configure(config, Thread.currentThread().getContextClassLoader()); } }