Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

endpoint to return backup queue, metatdata and pending files count #1100

Open
wants to merge 2 commits into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import com.google.inject.ImplementedBy;

/** estimates the number of bytes remaining to upload in a snapshot */
@ImplementedBy(SnapshotDirectorySize.class)
/** estimates the number of bytes and files remaining to upload in a snapshot/backup */
public interface DirectorySize {
/** return the total bytes of all snapshot files south of location in the filesystem */
/** return the total bytes of all snapshot/backup files south of location in the filesystem */
long getBytes(String location);
/** return the total files of all snapshot/backup files south of location in the filesystem */
int getFiles(String location);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import com.netflix.priam.scheduler.SimpleTimer;
import com.netflix.priam.scheduler.TaskTimer;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Set;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.netflix.priam.backup;

import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;

/** Estimates remaining bytes or files to upload in a backup by looking at the file system */
public class IncrementalBackupDirectorySize implements DirectorySize {

public long getBytes(String location) {
SummingFileVisitor fileVisitor = new SummingFileVisitor();
try {
Files.walkFileTree(Paths.get(location), fileVisitor);
} catch (IOException e) {
// BackupFileVisitor is happy with an estimate and won't produce these in practice.
}
return fileVisitor.getTotalBytes();
}

public int getFiles(String location) {
SummingFileVisitor fileVisitor = new SummingFileVisitor();
try {
Files.walkFileTree(Paths.get(location), fileVisitor);
} catch (IOException e) {
// BackupFileVisitor is happy with an estimate and won't produce these in practice.
}
return fileVisitor.getTotalFiles();
}

private static final class SummingFileVisitor implements FileVisitor<Path> {
private long totalBytes;
private int totalFiles;

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (file.toString().contains(AbstractBackup.INCREMENTAL_BACKUP_FOLDER) && attrs.isRegularFile()) {
totalBytes += attrs.size();
totalFiles += 1;
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) {
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
return FileVisitResult.CONTINUE;
}

long getTotalBytes() {
return totalBytes;
}

int getTotalFiles() {
return totalFiles;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;

/** Estimates remaining bytes to upload in a backup by looking at the file system */
/** Estimates remaining bytes or files to upload in a backup by looking at the file system */
public class SnapshotDirectorySize implements DirectorySize {

public long getBytes(String location) {
Expand All @@ -17,8 +17,19 @@ public long getBytes(String location) {
return fileVisitor.getTotalBytes();
}

public int getFiles(String location) {
SummingFileVisitor fileVisitor = new SummingFileVisitor();
try {
Files.walkFileTree(Paths.get(location), fileVisitor);
} catch (IOException e) {
// BackupFileVisitor is happy with an estimate and won't produce these in practice.
}
return fileVisitor.getTotalFiles();
}

private static final class SummingFileVisitor implements FileVisitor<Path> {
private long totalBytes;
private int totalFiles;

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
Expand All @@ -29,6 +40,7 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (file.toString().contains(AbstractBackup.SNAPSHOT_FOLDER) && attrs.isRegularFile()) {
totalBytes += attrs.size();
totalFiles += 1;
}
return FileVisitResult.CONTINUE;
}
Expand All @@ -46,5 +58,9 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
long getTotalBytes() {
return totalBytes;
}

int getTotalFiles() {
return totalFiles;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.netflix.priam.backupv2;

import com.netflix.priam.backup.DirectorySize;
import com.netflix.priam.backup.IncrementalBackup;
import com.netflix.priam.config.IBackupRestoreConfig;
import com.netflix.priam.config.IConfiguration;
Expand All @@ -25,8 +26,13 @@
import com.netflix.priam.scheduler.PriamScheduler;
import com.netflix.priam.scheduler.TaskTimer;
import com.netflix.priam.tuner.CassandraTunerService;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;
import org.apache.commons.lang3.math.Fraction;
import com.netflix.priam.backup.SnapshotDirectorySize;
import com.netflix.priam.backup.IncrementalBackupDirectorySize;


/**
* Encapsulate the backup service 2.0 - Execute all the tasks required to run backup service.
Expand All @@ -39,6 +45,8 @@ public class BackupV2Service implements IService {
private final SnapshotMetaTask snapshotMetaTask;
private final CassandraTunerService cassandraTunerService;
private final ITokenRetriever tokenRetriever;
private final DirectorySize snapshotDirectorySize = new SnapshotDirectorySize();
private final DirectorySize incrementalBackupDirectorySize = new IncrementalBackupDirectorySize();

@Inject
public BackupV2Service(
Expand Down Expand Up @@ -101,4 +109,11 @@ public void updateServicePre() throws Exception {

@Override
public void updateServicePost() throws Exception {}

public Map<String, Integer> countPendingBackupFiles() throws Exception {
Map<String, Integer> backupFiles = new HashMap<String, Integer>();
backupFiles.put("totalFiles", (snapshotDirectorySize.getFiles(configuration.getDataFileLocation()) +
incrementalBackupDirectorySize.getFiles(configuration.getDataFileLocation())));
return backupFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.netflix.priam.resources;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.priam.PriamServer;
import com.netflix.priam.backup.*;
import com.netflix.priam.backupv2.BackupTTLTask;
import com.netflix.priam.backupv2.BackupV2Service;
Expand All @@ -29,7 +31,9 @@
import com.netflix.priam.utils.GsonJsonSerializer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.inject.Inject;
Expand All @@ -55,6 +59,8 @@ public class BackupServletV2 {
private final Provider<AbstractBackupPath> pathProvider;
private final BackupV2Service backupService;
private final BackupNotificationMgr backupNotificationMgr;
private final PriamServer priamServer;

private static final String REST_SUCCESS = "[\"ok\"]";

@Inject
Expand All @@ -68,7 +74,8 @@ public BackupServletV2(
@Named("v2") IMetaProxy metaV2Proxy,
Provider<AbstractBackupPath> pathProvider,
BackupV2Service backupService,
BackupNotificationMgr backupNotificationMgr) {
BackupNotificationMgr backupNotificationMgr,
PriamServer priamServer) {
this.backupStatusMgr = backupStatusMgr;
this.backupVerification = backupVerification;
this.snapshotMetaService = snapshotMetaService;
Expand All @@ -78,6 +85,7 @@ public BackupServletV2(
this.pathProvider = pathProvider;
this.backupService = backupService;
this.backupNotificationMgr = backupNotificationMgr;
this.priamServer = priamServer;
}

@GET
Expand Down Expand Up @@ -181,4 +189,26 @@ public Response list(@PathParam("daterange") String daterange) throws Exception
.collect(Collectors.toList())))
.build();
}

@GET
@Path("/state/{hours}")
public Response backupState(@PathParam("hours") int hours) throws Exception {
Map<String, Object> responseMap = new HashMap<>();

responseMap.put("tasksQueued", fs.getUploadTasksQueued());
responseMap.put("queueSize", priamServer.getConfiguration().getBackupQueueSize());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not require information that only the local node has. Please read it from fast properties. Or better yet, don't include it in the calculations as we might move to a model where the queue is small and populated as needed and the only metric that matters is the remaining files on disk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This FP is not set for every cluster and we need it to compute the file threshold for which we want to wait for backup upload to complete.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are already making a call to Priam what is the harm in checking this here? The downside I see to checking FPs is we now rely on yet another system and per Ayushis comment would have the default hardcoded in multiple places.

for (Map.Entry<String, Integer> entry :
backupService.countPendingBackupFiles().entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to make a distinction between snapshot and incremental backup files in this case? Prefer to combine them into a total count if we can afford to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this in new commit

responseMap.put(entry.getKey(), entry.getValue());
}

List<BackupMetadata> latestBackupMetadata =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get this metadata from elsewhere. Please don't make the local node do it. Prefer creating an Antigravity endpoint that fetches this information from cass_cde.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want a node specific backup metadata in the realtime to compute if the backup was started in last 30 mins.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would also be concerned that we are depending on an additional system as I commented above. Is there a harm in checking this here?

backupStatusMgr.getLatestBackupMetadata(
new DateRange(Instant.now().minus(hours, ChronoUnit.HOURS), Instant.now()));
responseMap.put("latestBackupMetadata", latestBackupMetadata);

ObjectMapper mapper = new ObjectMapper();
String jsonResponse = mapper.writeValueAsString(responseMap);
return Response.ok(jsonResponse).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
public class TestBackupDynamicRateLimiter {
private static final Instant NOW = Instant.ofEpochMilli(1 << 16);
private static final Instant LATER = NOW.plusMillis(Duration.ofHours(1).toMillis());
private static final int DIR_SIZE = 1 << 16;
private static final int DIR_SIZE_BYTES = 1 << 16;
private static final int DIR_SIZE_FILES = 10;

private BackupDynamicRateLimiter rateLimiter;
private FakeConfiguration config;
Expand All @@ -34,22 +35,22 @@ public void setUp() {

@Test
public void sunnyDay() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Stopwatch timer = timePermitAcquisition(getBackupPath(), LATER, 21);
Truth.assertThat(timer.elapsed(TimeUnit.MILLISECONDS)).isAtLeast(1_000);
Truth.assertThat(timer.elapsed(TimeUnit.MILLISECONDS)).isAtMost(2_000);
}

@Test
public void targetSetToEpoch() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Stopwatch timer = timePermitAcquisition(getBackupPath(), Instant.EPOCH, 20);
assertNoRateLimiting(timer);
}

@Test
public void pathIsNotASnapshot() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
AbstractBackupPath path =
getBackupPath(
"target/data/Keyspace1/Standard1/backups/Keyspace1-Standard1-ia-4-Data.db");
Expand All @@ -59,47 +60,47 @@ public void pathIsNotASnapshot() {

@Test
public void targetIsNow() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Stopwatch timer = timePermitAcquisition(getBackupPath(), NOW, 20);
assertNoRateLimiting(timer);
}

@Test
public void targetIsInThePast() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Instant target = NOW.minus(Duration.ofHours(1L));
Stopwatch timer = timePermitAcquisition(getBackupPath(), target, 20);
assertNoRateLimiting(timer);
}

@Test
public void noBackupThreads() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 0), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 0), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalState(() -> timePermitAcquisition(getBackupPath(), LATER, 20));
}

@Test
public void negativeBackupThreads() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", -1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", -1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalState(() -> timePermitAcquisition(getBackupPath(), LATER, 20));
}

@Test
public void noData() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, 0);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, 0, 0);
Stopwatch timer = timePermitAcquisition(getBackupPath(), LATER, 20);
assertNoRateLimiting(timer);
}

@Test
public void noPermitsRequested() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalArgument(() -> timePermitAcquisition(getBackupPath(), LATER, 0));
}

@Test
public void negativePermitsRequested() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalArgument(() -> timePermitAcquisition(getBackupPath(), LATER, -1));
}

Expand All @@ -123,12 +124,12 @@ private Stopwatch timePermitAcquisition(AbstractBackupPath path, Instant now, in
}

private BackupDynamicRateLimiter getRateLimiter(
Map<String, Object> properties, Instant now, long directorySize) {
Map<String, Object> properties, Instant now, long directorySizeBytes, int directorySizeFiles) {
properties.forEach(config::setFakeConfig);
return new BackupDynamicRateLimiter(
config,
Clock.fixed(now, ZoneId.systemDefault()),
new FakeDirectorySize(directorySize));
new FakeDirectorySize(directorySizeBytes, directorySizeFiles));
}

private void assertNoRateLimiting(Stopwatch timer) {
Expand All @@ -155,14 +156,21 @@ private void assertIllegalArgument(Runnable method) {

private static final class FakeDirectorySize implements DirectorySize {
private final long size;
private final int fileCount;

FakeDirectorySize(long size) {
FakeDirectorySize(long size, int fileCount) {
this.size = size;
this.fileCount = fileCount;
}

@Override
public long getBytes(String location) {
return size;
}

@Override
public int getFiles(String location) {
return fileCount;
}
}
}
Loading