Skip to content

Commit

Permalink
[FLINK-36110][snapshot] Store periodic snapshot trigger timestamps in…
Browse files Browse the repository at this point in the history
… memory
  • Loading branch information
mateczagany committed Aug 20, 2024
1 parent c8a6ca8 commit 4811b19
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp(

var lastCompleteSnapshot =
snapshotList.stream()
.filter(s -> COMPLETED.equals(s.getStatus().getState()))
.filter(
s ->
s.getStatus() != null
&& COMPLETED.equals(s.getStatus().getState()))
.max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
.orElse(null);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package org.apache.flink.kubernetes.operator.reconciler;

import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.CrdConstants;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;

import io.fabric8.kubernetes.api.model.HasMetadata;
import lombok.RequiredArgsConstructor;

import javax.annotation.Nullable;

import java.time.Instant;
import java.util.Comparator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
import static org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType.PERIODIC;
import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;

/** Class used to store latest timestamps of periodic checkpoint/savepoint. */
@RequiredArgsConstructor
public class SnapshotTriggerTimestampStore {
private final SnapshotType snapshotType;
private final ConcurrentHashMap<String, Instant> lastTriggered = new ConcurrentHashMap<>();

/**
* Returns the time a periodic snapshot was last triggered for this resource. This is stored in
* memory, on operator start it will use the latest completed FlinkStateSnapshot CR creation
* timestamp. If none found, the return value will be the max of the resource creation timestamp
* and the latest triggered legacy snapshot.
*
* @param resource Flink resource
* @param snapshots related snapshot resources
* @return instant of last trigger
*/
public Instant getLastPeriodicTriggerInstant(
AbstractFlinkResource<?, ?> resource, @Nullable Set<FlinkStateSnapshot> snapshots) {
if (lastTriggered.containsKey(resource.getMetadata().getUid())) {
return lastTriggered.get(resource.getMetadata().getUid());
}

return Optional.ofNullable(snapshots).orElse(Set.of()).stream()
.filter(s -> s.getStatus() != null && COMPLETED.equals(s.getStatus().getState()))
.filter(s -> (snapshotType == SAVEPOINT) == s.getSpec().isSavepoint())
.filter(
s ->
PERIODIC.name()
.equals(
s.getMetadata()
.getLabels()
.get(CrdConstants.LABEL_SNAPSHOT_TYPE)))
.map(s -> DateTimeUtils.parseKubernetes(s.getMetadata().getCreationTimestamp()))
.max(Comparator.naturalOrder())
.orElseGet(
() -> {
var legacyTs = getLegacyTimestamp(resource);
var creationTs =
Instant.parse(resource.getMetadata().getCreationTimestamp());

if (legacyTs.compareTo(creationTs) > 0) {
return legacyTs;
} else {
return creationTs;
}
});
}

/**
* Updates the time a periodic snapshot was last triggered for this resource.
*
* @param resource Kubernetes resource
* @param instant new timestamp
*/
public void updateLastPeriodicTriggerTimestamp(HasMetadata resource, Instant instant) {
lastTriggered.put(resource.getMetadata().getUid(), instant);
}

private Instant getLegacyTimestamp(AbstractFlinkResource<?, ?> resource) {
SnapshotInfo snapshotInfo;
switch (snapshotType) {
case SAVEPOINT:
snapshotInfo = resource.getStatus().getJobStatus().getSavepointInfo();
break;
case CHECKPOINT:
snapshotInfo = resource.getStatus().getJobStatus().getCheckpointInfo();
break;
default:
throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
}

return Instant.ofEpochMilli(snapshotInfo.getLastPeriodicTriggerTimestamp());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
Expand All @@ -37,6 +38,7 @@
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotTriggerTimestampStore;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
Expand All @@ -53,6 +55,7 @@

import java.time.Instant;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED;
Expand All @@ -74,6 +77,11 @@ public abstract class AbstractJobReconciler<

public static final String LAST_STATE_DUMMY_SP_PATH = "KUBERNETES_OPERATOR_LAST_STATE";

private final SnapshotTriggerTimestampStore checkpointTriggerTimestamps =
new SnapshotTriggerTimestampStore(CHECKPOINT);
private final SnapshotTriggerTimestampStore savepointTriggerTimestamps =
new SnapshotTriggerTimestampStore(SAVEPOINT);

public AbstractJobReconciler(
EventRecorder eventRecorder,
StatusRecorder<CR, STATUS> statusRecorder,
Expand Down Expand Up @@ -374,13 +382,27 @@ private boolean triggerSnapshotIfNeeded(FlinkResourceContext<CR> ctx, SnapshotTy
var resource = ctx.getResource();
var conf = ctx.getObserveConfig();

Optional<SnapshotTriggerType> triggerOpt =
SnapshotUtils.shouldTriggerSnapshot(resource, conf, snapshotType);
var timestampStore =
snapshotType == SAVEPOINT
? savepointTriggerTimestamps
: checkpointTriggerTimestamps;
Set<FlinkStateSnapshot> snapshots = Set.of();
if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled(ctx.getOperatorConfig(), conf)) {
snapshots = ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class);
}

var lastTrigger = timestampStore.getLastPeriodicTriggerInstant(resource, snapshots);
var triggerOpt =
SnapshotUtils.shouldTriggerSnapshot(resource, conf, snapshotType, lastTrigger);
if (triggerOpt.isEmpty()) {
return false;
}
var triggerType = triggerOpt.get();

if (SnapshotTriggerType.PERIODIC.equals(triggerType)) {
timestampStore.updateLastPeriodicTriggerTimestamp(resource, Instant.now());
}

var createSnapshotResource =
FlinkStateSnapshotUtils.isSnapshotResourceEnabled(ctx.getOperatorConfig(), conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public static SnapshotStatus getLastSnapshotStatus(
*/
@VisibleForTesting
public static Optional<SnapshotTriggerType> shouldTriggerSnapshot(
AbstractFlinkResource<?, ?> resource, Configuration conf, SnapshotType snapshotType) {
AbstractFlinkResource<?, ?> resource,
Configuration conf,
SnapshotType snapshotType,
Instant lastTrigger) {

var status = resource.getStatus();
var jobStatus = status.getJobStatus();
Expand All @@ -153,23 +156,20 @@ public static Optional<SnapshotTriggerType> shouldTriggerSnapshot(
Long triggerNonce;
Long reconciledTriggerNonce;
boolean inProgress;
SnapshotInfo snapshotInfo;
String automaticTriggerExpression;

switch (snapshotType) {
case SAVEPOINT:
triggerNonce = jobSpec.getSavepointTriggerNonce();
reconciledTriggerNonce = reconciledJobSpec.getSavepointTriggerNonce();
inProgress = savepointInProgress(jobStatus);
snapshotInfo = jobStatus.getSavepointInfo();
automaticTriggerExpression =
conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
break;
case CHECKPOINT:
triggerNonce = jobSpec.getCheckpointTriggerNonce();
reconciledTriggerNonce = reconciledJobSpec.getCheckpointTriggerNonce();
inProgress = checkpointInProgress(jobStatus);
snapshotInfo = jobStatus.getCheckpointInfo();
automaticTriggerExpression =
conf.get(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
break;
Expand All @@ -193,14 +193,6 @@ public static Optional<SnapshotTriggerType> shouldTriggerSnapshot(
}
}

var lastTriggerTs = snapshotInfo.getLastPeriodicTriggerTimestamp();
// When the resource is first created/periodic snapshotting enabled we have to compare
// against the creation timestamp for triggering the first periodic savepoint
var lastTrigger =
lastTriggerTs == 0
? Instant.parse(resource.getMetadata().getCreationTimestamp())
: Instant.ofEpochMilli(lastTriggerTs);

if (shouldTriggerAutomaticSnapshot(snapshotType, automaticTriggerExpression, lastTrigger)) {
if (snapshotType == CHECKPOINT && !isSnapshotTriggeringSupported(conf)) {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobReference;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
Expand Down Expand Up @@ -408,33 +405,17 @@ public static void reconcileSpec(FlinkDeployment deployment) {
* Sets up an active cron trigger by ensuring that the latest successful snapshot happened
* earlier than the scheduled trigger.
*/
public static void setupCronTrigger(SnapshotType snapshotType, FlinkDeployment deployment) {
public static Instant setupCronTrigger(SnapshotType snapshotType, FlinkDeployment deployment) {

Calendar calendar = Calendar.getInstance();
calendar.set(2022, Calendar.JUNE, 5, 11, 0);
long lastCheckpointTimestamp = calendar.getTimeInMillis();

String cronOptionKey;

switch (snapshotType) {
case SAVEPOINT:
Savepoint lastSavepoint =
Savepoint.of("", lastCheckpointTimestamp, SnapshotTriggerType.PERIODIC);
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.updateLastSavepoint(lastSavepoint);
cronOptionKey = KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key();
break;
case CHECKPOINT:
Checkpoint lastCheckpoint =
Checkpoint.of(lastCheckpointTimestamp, SnapshotTriggerType.PERIODIC);
deployment
.getStatus()
.getJobStatus()
.getCheckpointInfo()
.updateLastCheckpoint(lastCheckpoint);
cronOptionKey = KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL.key();
break;
default:
Expand All @@ -443,6 +424,7 @@ public static void setupCronTrigger(SnapshotType snapshotType, FlinkDeployment d

deployment.getSpec().getFlinkConfiguration().put(cronOptionKey, "0 0 12 5 6 ? 2022");
reconcileSpec(deployment);
return calendar.toInstant();
}

/** Testing ResponseProvider. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.apache.flink.kubernetes.operator.reconciler;

import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.CrdConstants;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec;
import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec;
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;

import org.junit.jupiter.api.Test;

import java.time.Instant;
import java.util.Set;

import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
import static org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType.PERIODIC;
import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
import static org.junit.jupiter.api.Assertions.assertEquals;

class SnapshotTriggerTimestampStoreTest {

@Test
public void testCheckpointTimestampStore() {
testTimestampStore(CHECKPOINT);
}

@Test
public void testSavepointTimestampStore() {
testTimestampStore(SAVEPOINT);
}

private void testTimestampStore(SnapshotType snapshotType) {
var resource = TestUtils.buildApplicationCluster();
var store = new SnapshotTriggerTimestampStore(snapshotType);

var instantCreation = Instant.ofEpochMilli(1);
resource.getMetadata().setCreationTimestamp(DateTimeUtils.kubernetes(instantCreation));

assertEquals(instantCreation, store.getLastPeriodicTriggerInstant(resource, Set.of()));

var instantLegacy = Instant.ofEpochMilli(2);
if (snapshotType == SAVEPOINT) {
resource.getStatus()
.getJobStatus()
.getSavepointInfo()
.updateLastSavepoint(new Savepoint(2L, "", PERIODIC, null, null));
} else {
resource.getStatus()
.getJobStatus()
.getCheckpointInfo()
.updateLastCheckpoint(new Checkpoint(2L, PERIODIC, null, null));
}
assertEquals(instantLegacy, store.getLastPeriodicTriggerInstant(resource, Set.of()));

var snapshots = Set.of(createSnapshot(snapshotType, SnapshotTriggerType.PERIODIC, 3L));
assertEquals(
Instant.ofEpochMilli(3), store.getLastPeriodicTriggerInstant(resource, snapshots));

snapshots =
Set.of(
createSnapshot(snapshotType, SnapshotTriggerType.PERIODIC, 200L),
createSnapshot(snapshotType, SnapshotTriggerType.PERIODIC, 300L),
createSnapshot(snapshotType, SnapshotTriggerType.MANUAL, 10000L),
createSnapshot(snapshotType, SnapshotTriggerType.PERIODIC, 0L));
assertEquals(
Instant.ofEpochMilli(300),
store.getLastPeriodicTriggerInstant(resource, snapshots));

var instantInMemory = Instant.ofEpochMilli(111L);
store.updateLastPeriodicTriggerTimestamp(resource, instantInMemory);
assertEquals(instantInMemory, store.getLastPeriodicTriggerInstant(resource, snapshots));

instantInMemory = Instant.ofEpochMilli(11L);
store.updateLastPeriodicTriggerTimestamp(resource, instantInMemory);
assertEquals(instantInMemory, store.getLastPeriodicTriggerInstant(resource, snapshots));
}

private FlinkStateSnapshot createSnapshot(
SnapshotType snapshotType, SnapshotTriggerType triggerType, Long timestamp) {
var snapshot = new FlinkStateSnapshot();
snapshot.getMetadata()
.setCreationTimestamp(DateTimeUtils.kubernetes(Instant.ofEpochMilli(timestamp)));
if (snapshotType == SAVEPOINT) {
snapshot.getSpec().setSavepoint(new SavepointSpec());
} else {
snapshot.getSpec().setCheckpoint(new CheckpointSpec());
}
snapshot.getMetadata()
.getLabels()
.put(CrdConstants.LABEL_SNAPSHOT_TYPE, triggerType.name());
snapshot.setStatus(new FlinkStateSnapshotStatus());
snapshot.getStatus().setState(COMPLETED);
return snapshot;
}
}
Loading

0 comments on commit 4811b19

Please sign in to comment.