Skip to content

Commit

Permalink
Use checkpoint when resume snapshot
Browse files Browse the repository at this point in the history
Change-Id: Iebf742426aa6b696b2397ec90368a13d04c04b56
  • Loading branch information
Linary committed Mar 24, 2021
1 parent 5adb2c6 commit 372657d
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 77 deletions.
2 changes: 1 addition & 1 deletion hugegraph-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@
</addDefaultSpecificationEntries>
</manifest>
<manifestEntries>
<Implementation-Version>0.59.0.0</Implementation-Version>
<Implementation-Version>0.60.0.0</Implementation-Version>
</manifestEntries>
</archive>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void clear(@Context GraphManager manager,

@PUT
@Timed
@Path("{name}/create-snapshot")
@Path("{name}/snapshot_create")
@Produces(APPLICATION_JSON_WITH_CHARSET)
@RolesAllowed({"admin", "$owner=$name"})
public Object createSnapshot(@Context GraphManager manager,
Expand All @@ -154,7 +154,7 @@ public Object createSnapshot(@Context GraphManager manager,

@PUT
@Timed
@Path("{name}/resume-snapshot")
@Path("{name}/snapshot_resume")
@Produces(APPLICATION_JSON_WITH_CHARSET)
@RolesAllowed({"admin", "$owner=$name"})
public Object resumeSnapshot(@Context GraphManager manager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ public final class ApiVersion {
* [0.58] Issue-1173: Supports customized kout/kneighbor,
* multi-node-shortest-path, jaccard-similar and template-paths
* [0.59] Issue-1333: Support graph read mode for olap property
* [0.60] Issue-1392: Support create and resume snapshot
*/

// The second parameter of Version.of() is for IDE running without JAR
public static final Version VERSION = Version.of(ApiVersion.class, "0.59");
public static final Version VERSION = Version.of(ApiVersion.class, "0.60");

public static final void check() {
// Check version of hugegraph-core. Firstly do check from version 0.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ public void truncateBackend() {
LOG.info("Graph '{}' has been truncated", this.name);
}

@Override
public void createSnapshot() {
LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ public default void setCounterLowest(HugeType type, long lowest) {
public long getCounter(HugeType type);

public default Set<String> createSnapshot(String snapshotDir) {
throw new UnsupportedOperationException("writeSnapshot");
throw new UnsupportedOperationException("createSnapshot");
}

public default Set<String> resumeSnapshot(String snapshotDir) {
throw new UnsupportedOperationException("readSnapshot");
throw new UnsupportedOperationException("resumeSnapshot");
}

static enum TxState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ public void createSnapshot() {

@Override
public void resumeSnapshot() {
// jraft doesn't expose API to load snapshot
throw new UnsupportedOperationException();
// Jraft doesn't expose API to load snapshot
throw new UnsupportedOperationException("resumeSnapshot");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.zip.Checksum;
Expand Down Expand Up @@ -65,11 +64,8 @@ public void save(SnapshotWriter writer, Closure done,
Set<String> snapshotDirs = this.doSnapshotSave();
executor.execute(() -> {
long begin = System.currentTimeMillis();
Set<String> tarSnapshotFiles =
this.compressSnapshotDir(snapshotDirs, done);
String jraftSnapshotPath =
this.writeManifest(writer, tarSnapshotFiles, done);
this.deleteSnapshotDir(snapshotDirs, done);
this.writeManifest(writer, snapshotDirs, done);
this.compressJraftSnapshotDir(writer, jraftSnapshotPath, done);
LOG.info("Compress snapshot cost {}ms",
System.currentTimeMillis() - begin);
Expand All @@ -93,7 +89,7 @@ public boolean load(SnapshotReader reader) {
.toString();
try {
// decompress manifest and data directory
this.decompressSnapshot(readerPath, jraftSnapshotPath, meta);
this.decompressSnapshot(readerPath, meta);
this.doSnapshotLoad();
File tmp = new File(jraftSnapshotPath);
// Delete the decompressed temporary file. If the deletion fails
Expand Down Expand Up @@ -159,15 +155,15 @@ private void deleteSnapshotDir(Set<String> snapshotDirs,
}

private String writeManifest(SnapshotWriter writer,
Set<String> tarSnapshotFiles,
Set<String> snapshotFiles,
Closure done) {
String writerPath = writer.getPath();
// Write all backend compressed snapshot file path to manifest
String jraftSnapshotPath = Paths.get(writerPath, SNAPSHOT_DIR)
.toString();
File snapshotManifest = new File(jraftSnapshotPath, MANIFEST);
try {
FileUtils.writeLines(snapshotManifest, tarSnapshotFiles);
FileUtils.writeLines(snapshotManifest, snapshotFiles);
} catch (IOException e) {
done.run(new Status(RaftError.EIO,
"Failed to write backend snapshot file path " +
Expand Down Expand Up @@ -202,9 +198,8 @@ private void compressJraftSnapshotDir(SnapshotWriter writer,
}
}

private void decompressSnapshot(String readerPath,
String jraftSnapshotPath,
LocalFileMeta meta) throws IOException {
private void decompressSnapshot(String readerPath, LocalFileMeta meta)
throws IOException {
String archiveFile = Paths.get(readerPath, SNAPSHOT_ARCHIVE).toString();
Checksum checksum = new CRC64();
CompressUtil.decompressTar(archiveFile, readerPath, checksum);
Expand All @@ -213,15 +208,5 @@ private void decompressSnapshot(String readerPath,
Long.toHexString(checksum.getValue())),
"Snapshot checksum failed");
}

File snapshotManifest = new File(jraftSnapshotPath, MANIFEST);
List<String> compressedSnapshotFiles = FileUtils.readLines(
snapshotManifest);
for (String compressedSnapshotFile : compressedSnapshotFiles) {
String targetDir = Paths.get(compressedSnapshotFile).getParent()
.toString();
CompressUtil.decompressTar(compressedSnapshotFile, targetDir,
new CRC64());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;

import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
Expand All @@ -47,6 +48,9 @@ public RocksDBSessions(HugeConfig config, String database, String store) {
public abstract RocksDBSessions copy(HugeConfig config,
String database, String store);

public abstract RocksDB createSnapshotRocksDB(String snapshotPath)
throws RocksDBException;

public abstract void createSnapshot(String parentPath);

public abstract void reload() throws RocksDBException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package com.baidu.hugegraph.backend.store.rocksdb;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand All @@ -35,11 +33,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
Expand Down Expand Up @@ -324,27 +320,33 @@ public RocksDBSessions copy(HugeConfig config,
}

@Override
public void createSnapshot(String snapshotPath) {
// https://github.com/facebook/rocksdb/wiki/Checkpoints
try (Checkpoint checkpoint = Checkpoint.create(this.rocksdb)) {
String tempPath = snapshotPath + "_temp";
File tempFile = new File(tempPath);
FileUtils.deleteDirectory(tempFile);
LOG.debug("Deleted temp directory {}", tempFile);

FileUtils.forceMkdir(tempFile.getParentFile());
checkpoint.createCheckpoint(tempPath);
File snapshotFile = new File(snapshotPath);
FileUtils.deleteDirectory(snapshotFile);
LOG.debug("Deleted stale directory {}", snapshotFile);
if (!tempFile.renameTo(snapshotFile)) {
throw new IOException(String.format("Failed to rename %s to %s",
tempFile, snapshotFile));
}
} catch (Exception e) {
throw new BackendException("Failed to write snapshot at path %s",
e, snapshotPath);
public RocksDB createSnapshotRocksDB(String snapshotPath)
throws RocksDBException {
// Init CFs options
Set<String> mergedCFs = this.mergeOldCFs(snapshotPath, new ArrayList<>(
this.cfs.keySet()));
List<String> cfNames = ImmutableList.copyOf(mergedCFs);

List<ColumnFamilyDescriptor> cfds = new ArrayList<>(cfNames.size());
for (String cf : cfNames) {
ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf));
ColumnFamilyOptions options = cfd.getOptions();
RocksDBStdSessions.initOptions(this.config, null, null,
options, options);
cfds.add(cfd);
}
List<ColumnFamilyHandle> cfhs = new ArrayList<>();

// Init DB options
DBOptions options = new DBOptions();
RocksDBStdSessions.initOptions(this.config, options, options,
null, null);
return RocksDB.open(options, snapshotPath, cfds, cfhs);
}

@Override
public void createSnapshot(String snapshotPath) {
RocksDBStore.createCheckpoint(this.rocksdb, snapshotPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.stream.Collectors;

import org.apache.commons.io.FileUtils;
import org.rocksdb.Checkpoint;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;

Expand All @@ -65,7 +67,6 @@
import com.baidu.hugegraph.util.Consumers;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.ExecutorUtil;
import com.baidu.hugegraph.util.GZipUtil;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.baidu.hugegraph.util.Log;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -664,11 +665,19 @@ public Set<String> resumeSnapshot(String snapshotPrefix) {
Path snapshotPath = parentParentPath.resolve(snapshotPrefix +
"_" + pureDataPath);
E.checkState(snapshotPath.toFile().exists(),
"The snapshot path '%s' doesn't exist");
"The snapshot path '%s' doesn't exist",
snapshotPath);
LOG.debug("The origin data path: {}", originDataPath);
LOG.debug("The snapshot data path: {}", snapshotPath);
fileNameMaps.put(originDataPath, snapshotPath);

RocksDBSessions sessions = entry.getValue();
RocksDB rocksdb = sessions.createSnapshotRocksDB(
snapshotPath.toString());
Path snapshotLinkPath = Paths.get(originDataPath + "_link");
createCheckpoint(rocksdb, snapshotLinkPath.toString());
rocksdb.close();

fileNameMaps.put(originDataPath, snapshotLinkPath);
uniqueParents.add(snapshotPath.getParent().toString());
}
E.checkState(!fileNameMaps.isEmpty(),
Expand All @@ -684,19 +693,19 @@ public Set<String> resumeSnapshot(String snapshotPrefix) {
// Move snapshot file to origin data path
for (Map.Entry<Path, Path> entry : fileNameMaps.entrySet()) {
File originDataDir = entry.getKey().toFile();
File snapshotDir = entry.getValue().toFile();
File snapshotLinkDir = entry.getValue().toFile();
try {
if (originDataDir.exists()) {
LOG.info("Delete origin data directory {}",
originDataDir);
FileUtils.deleteDirectory(originDataDir);
}
FileUtils.moveDirectory(snapshotDir, originDataDir);
FileUtils.moveDirectory(snapshotLinkDir, originDataDir);
LOG.info("Move snapshot directory {} to {}",
snapshotDir, originDataDir);
} catch (IOException e) {
throw new BackendException("Failed to move %s to %s",
e, snapshotDir, originDataDir);
snapshotLinkDir, originDataDir);
} catch (Exception e) {
throw new BackendException("Failed to move %s to %s", e,
snapshotLinkDir, originDataDir);
}
}
// Reload rocksdb instance
Expand Down Expand Up @@ -731,18 +740,6 @@ private final void closeSessions() {
}
}

private Session findMatchedSession(File snapshotFile) {
String fileName = snapshotFile.getName();
for (Session session : this.session()) {
String md5 = GZipUtil.md5(session.dataPath());
if (fileName.equals(md5)) {
return session;
}
}
throw new BackendException("Can't find matched session for " +
"snapshot file %s", snapshotFile);
}

private final List<Session> session() {
this.checkOpened();

Expand Down Expand Up @@ -828,6 +825,29 @@ private static boolean existsOtherKeyspace(String dataPath) {
return false;
}

public static void createCheckpoint(RocksDB rocksdb, String targetPath) {
// https://github.com/facebook/rocksdb/wiki/Checkpoints
try (Checkpoint checkpoint = Checkpoint.create(rocksdb)) {
String tempPath = targetPath + "_temp";
File tempFile = new File(tempPath);
FileUtils.deleteDirectory(tempFile);
LOG.debug("Deleted temp directory {}", tempFile);

FileUtils.forceMkdir(tempFile.getParentFile());
checkpoint.createCheckpoint(tempPath);
File snapshotFile = new File(targetPath);
FileUtils.deleteDirectory(snapshotFile);
LOG.debug("Deleted stale directory {}", snapshotFile);
if (!tempFile.renameTo(snapshotFile)) {
throw new IOException(String.format("Failed to rename %s to %s",
tempFile, snapshotFile));
}
} catch (Exception e) {
throw new BackendException("Failed to create checkpoint at path %s",
e, targetPath);
}
}

/***************************** Store defines *****************************/

public static class RocksDBSchemaStore extends RocksDBStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.EnvOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileWriter;

Expand Down Expand Up @@ -134,7 +135,7 @@ public boolean existsTable(String table) {

@Override
public List<String> property(String property) {
throw new NotSupportException("RocksDBSstStore property()");
throw new UnsupportedOperationException("RocksDBSstStore property()");
}

@Override
Expand All @@ -143,6 +144,11 @@ public RocksDBSessions copy(HugeConfig config,
return new RocksDBSstSessions(config, database, store, this);
}

@Override
public RocksDB createSnapshotRocksDB(String snapshotPath) {
throw new UnsupportedOperationException("createSnapshotRocksDB");
}

@Override
public void createSnapshot(String snapshotPath) {
throw new UnsupportedOperationException("createSnapshot");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5491,8 +5491,8 @@ public void testRemoveEdgesOfSuperVertex() {
// Clear all
try {
graph.truncateBackend();
} catch (UnsupportedOperationException ignored) {
// pass
} catch (UnsupportedOperationException e) {
LOG.warn("Failed to create snapshot", e);
}
}

Expand Down

0 comments on commit 372657d

Please sign in to comment.