From 372657d070f5e257588993544c0ee60aa02f2afd Mon Sep 17 00:00:00 2001 From: liningrui Date: Wed, 24 Mar 2021 15:42:25 +0800 Subject: [PATCH] Use checkpoint when resume snapshot Change-Id: Iebf742426aa6b696b2397ec90368a13d04c04b56 --- hugegraph-api/pom.xml | 2 +- .../hugegraph/api/profile/GraphsAPI.java | 4 +- .../baidu/hugegraph/version/ApiVersion.java | 3 +- .../baidu/hugegraph/StandardHugeGraph.java | 1 + .../hugegraph/backend/store/BackendStore.java | 4 +- .../store/raft/RaftBackendStoreProvider.java | 4 +- .../backend/store/raft/StoreSnapshotFile.java | 27 ++------ .../store/rocksdb/RocksDBSessions.java | 4 ++ .../store/rocksdb/RocksDBStdSessions.java | 50 ++++++++------- .../backend/store/rocksdb/RocksDBStore.java | 62 ++++++++++++------- .../store/rocksdbsst/RocksDBSstSessions.java | 8 ++- .../baidu/hugegraph/core/EdgeCoreTest.java | 4 +- 12 files changed, 96 insertions(+), 77 deletions(-) diff --git a/hugegraph-api/pom.xml b/hugegraph-api/pom.xml index b687b64ac2..ee70e207aa 100644 --- a/hugegraph-api/pom.xml +++ b/hugegraph-api/pom.xml @@ -171,7 +171,7 @@ - 0.59.0.0 + 0.60.0.0 diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/profile/GraphsAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/profile/GraphsAPI.java index 5003001ca8..eccb92c45c 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/profile/GraphsAPI.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/profile/GraphsAPI.java @@ -140,7 +140,7 @@ public void clear(@Context GraphManager manager, @PUT @Timed - @Path("{name}/create-snapshot") + @Path("{name}/snapshot_create") @Produces(APPLICATION_JSON_WITH_CHARSET) @RolesAllowed({"admin", "$owner=$name"}) public Object createSnapshot(@Context GraphManager manager, @@ -154,7 +154,7 @@ public Object createSnapshot(@Context GraphManager manager, @PUT @Timed - @Path("{name}/resume-snapshot") + @Path("{name}/snapshot_resume") @Produces(APPLICATION_JSON_WITH_CHARSET) @RolesAllowed({"admin", "$owner=$name"}) public Object resumeSnapshot(@Context GraphManager manager, diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java index 4062ef3534..8b8babce64 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java @@ -109,10 +109,11 @@ public final class ApiVersion { * [0.58] Issue-1173: Supports customized kout/kneighbor, * multi-node-shortest-path, jaccard-similar and template-paths * [0.59] Issue-1333: Support graph read mode for olap property + * [0.60] Issue-1392: Support create and resume snapshot */ // The second parameter of Version.of() is for IDE running without JAR - public static final Version VERSION = Version.of(ApiVersion.class, "0.59"); + public static final Version VERSION = Version.of(ApiVersion.class, "0.60"); public static final void check() { // Check version of hugegraph-core. Firstly do check from version 0.3 diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index a627b73b79..3a16deb59e 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -368,6 +368,7 @@ public void truncateBackend() { LOG.info("Graph '{}' has been truncated", this.name); } + @Override public void createSnapshot() { LockUtil.lock(this.name, LockUtil.GRAPH_LOCK); try { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java index 42257fcdbb..f92d8a5b51 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java @@ -118,11 +118,11 @@ public default void setCounterLowest(HugeType type, long lowest) { public long getCounter(HugeType type); public default Set createSnapshot(String snapshotDir) { - throw new UnsupportedOperationException("writeSnapshot"); + throw new UnsupportedOperationException("createSnapshot"); } public default Set resumeSnapshot(String snapshotDir) { - throw new UnsupportedOperationException("readSnapshot"); + throw new UnsupportedOperationException("resumeSnapshot"); } static enum TxState { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java index 2f55018563..cabbd14d7f 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java @@ -209,8 +209,8 @@ public void createSnapshot() { @Override public void resumeSnapshot() { - // jraft doesn't expose API to load snapshot - throw new UnsupportedOperationException(); + // Jraft doesn't expose API to load snapshot + throw new UnsupportedOperationException("resumeSnapshot"); } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java index 96e8833c0c..52132943be 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.nio.file.Paths; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.zip.Checksum; @@ -65,11 +64,8 @@ public void save(SnapshotWriter writer, Closure done, Set snapshotDirs = this.doSnapshotSave(); executor.execute(() -> { long begin = System.currentTimeMillis(); - Set tarSnapshotFiles = - this.compressSnapshotDir(snapshotDirs, done); String jraftSnapshotPath = - this.writeManifest(writer, tarSnapshotFiles, done); - this.deleteSnapshotDir(snapshotDirs, done); + this.writeManifest(writer, snapshotDirs, done); this.compressJraftSnapshotDir(writer, jraftSnapshotPath, done); LOG.info("Compress snapshot cost {}ms", System.currentTimeMillis() - begin); @@ -93,7 +89,7 @@ public boolean load(SnapshotReader reader) { .toString(); try { // decompress manifest and data directory - this.decompressSnapshot(readerPath, jraftSnapshotPath, meta); + this.decompressSnapshot(readerPath, meta); this.doSnapshotLoad(); File tmp = new File(jraftSnapshotPath); // Delete the decompressed temporary file. If the deletion fails @@ -159,7 +155,7 @@ private void deleteSnapshotDir(Set snapshotDirs, } private String writeManifest(SnapshotWriter writer, - Set tarSnapshotFiles, + Set snapshotFiles, Closure done) { String writerPath = writer.getPath(); // Write all backend compressed snapshot file path to manifest @@ -167,7 +163,7 @@ private String writeManifest(SnapshotWriter writer, .toString(); File snapshotManifest = new File(jraftSnapshotPath, MANIFEST); try { - FileUtils.writeLines(snapshotManifest, tarSnapshotFiles); + FileUtils.writeLines(snapshotManifest, snapshotFiles); } catch (IOException e) { done.run(new Status(RaftError.EIO, "Failed to write backend snapshot file path " + @@ -202,9 +198,8 @@ private void compressJraftSnapshotDir(SnapshotWriter writer, } } - private void decompressSnapshot(String readerPath, - String jraftSnapshotPath, - LocalFileMeta meta) throws IOException { + private void decompressSnapshot(String readerPath, LocalFileMeta meta) + throws IOException { String archiveFile = Paths.get(readerPath, SNAPSHOT_ARCHIVE).toString(); Checksum checksum = new CRC64(); CompressUtil.decompressTar(archiveFile, readerPath, checksum); @@ -213,15 +208,5 @@ private void decompressSnapshot(String readerPath, Long.toHexString(checksum.getValue())), "Snapshot checksum failed"); } - - File snapshotManifest = new File(jraftSnapshotPath, MANIFEST); - List compressedSnapshotFiles = FileUtils.readLines( - snapshotManifest); - for (String compressedSnapshotFile : compressedSnapshotFiles) { - String targetDir = Paths.get(compressedSnapshotFile).getParent() - .toString(); - CompressUtil.decompressTar(compressedSnapshotFile, targetDir, - new CRC64()); - } } } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java index 1889344ed5..4e81cc4d30 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.commons.lang3.tuple.Pair; +import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator; @@ -47,6 +48,9 @@ public RocksDBSessions(HugeConfig config, String database, String store) { public abstract RocksDBSessions copy(HugeConfig config, String database, String store); + public abstract RocksDB createSnapshotRocksDB(String snapshotPath) + throws RocksDBException; + public abstract void createSnapshot(String parentPath); public abstract void reload() throws RocksDBException; diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index 48191965c8..dc976bf668 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -20,8 +20,6 @@ package com.baidu.hugegraph.backend.store.rocksdb; import java.io.Closeable; -import java.io.File; -import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; @@ -35,11 +33,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; -import org.rocksdb.Checkpoint; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -324,27 +320,33 @@ public RocksDBSessions copy(HugeConfig config, } @Override - public void createSnapshot(String snapshotPath) { - // https://github.com/facebook/rocksdb/wiki/Checkpoints - try (Checkpoint checkpoint = Checkpoint.create(this.rocksdb)) { - String tempPath = snapshotPath + "_temp"; - File tempFile = new File(tempPath); - FileUtils.deleteDirectory(tempFile); - LOG.debug("Deleted temp directory {}", tempFile); - - FileUtils.forceMkdir(tempFile.getParentFile()); - checkpoint.createCheckpoint(tempPath); - File snapshotFile = new File(snapshotPath); - FileUtils.deleteDirectory(snapshotFile); - LOG.debug("Deleted stale directory {}", snapshotFile); - if (!tempFile.renameTo(snapshotFile)) { - throw new IOException(String.format("Failed to rename %s to %s", - tempFile, snapshotFile)); - } - } catch (Exception e) { - throw new BackendException("Failed to write snapshot at path %s", - e, snapshotPath); + public RocksDB createSnapshotRocksDB(String snapshotPath) + throws RocksDBException { + // Init CFs options + Set mergedCFs = this.mergeOldCFs(snapshotPath, new ArrayList<>( + this.cfs.keySet())); + List cfNames = ImmutableList.copyOf(mergedCFs); + + List cfds = new ArrayList<>(cfNames.size()); + for (String cf : cfNames) { + ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf)); + ColumnFamilyOptions options = cfd.getOptions(); + RocksDBStdSessions.initOptions(this.config, null, null, + options, options); + cfds.add(cfd); } + List cfhs = new ArrayList<>(); + + // Init DB options + DBOptions options = new DBOptions(); + RocksDBStdSessions.initOptions(this.config, options, options, + null, null); + return RocksDB.open(options, snapshotPath, cfds, cfhs); + } + + @Override + public void createSnapshot(String snapshotPath) { + RocksDBStore.createCheckpoint(this.rocksdb, snapshotPath); } @Override diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java index f73899bdf9..0d7112e766 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -44,6 +44,8 @@ import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; +import org.rocksdb.Checkpoint; +import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.slf4j.Logger; @@ -65,7 +67,6 @@ import com.baidu.hugegraph.util.Consumers; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.ExecutorUtil; -import com.baidu.hugegraph.util.GZipUtil; import com.baidu.hugegraph.util.InsertionOrderUtil; import com.baidu.hugegraph.util.Log; import com.google.common.collect.ImmutableList; @@ -664,11 +665,19 @@ public Set resumeSnapshot(String snapshotPrefix) { Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" + pureDataPath); E.checkState(snapshotPath.toFile().exists(), - "The snapshot path '%s' doesn't exist"); + "The snapshot path '%s' doesn't exist", + snapshotPath); LOG.debug("The origin data path: {}", originDataPath); LOG.debug("The snapshot data path: {}", snapshotPath); - fileNameMaps.put(originDataPath, snapshotPath); + RocksDBSessions sessions = entry.getValue(); + RocksDB rocksdb = sessions.createSnapshotRocksDB( + snapshotPath.toString()); + Path snapshotLinkPath = Paths.get(originDataPath + "_link"); + createCheckpoint(rocksdb, snapshotLinkPath.toString()); + rocksdb.close(); + + fileNameMaps.put(originDataPath, snapshotLinkPath); uniqueParents.add(snapshotPath.getParent().toString()); } E.checkState(!fileNameMaps.isEmpty(), @@ -684,19 +693,19 @@ public Set resumeSnapshot(String snapshotPrefix) { // Move snapshot file to origin data path for (Map.Entry entry : fileNameMaps.entrySet()) { File originDataDir = entry.getKey().toFile(); - File snapshotDir = entry.getValue().toFile(); + File snapshotLinkDir = entry.getValue().toFile(); try { if (originDataDir.exists()) { LOG.info("Delete origin data directory {}", originDataDir); FileUtils.deleteDirectory(originDataDir); } - FileUtils.moveDirectory(snapshotDir, originDataDir); + FileUtils.moveDirectory(snapshotLinkDir, originDataDir); LOG.info("Move snapshot directory {} to {}", - snapshotDir, originDataDir); - } catch (IOException e) { - throw new BackendException("Failed to move %s to %s", - e, snapshotDir, originDataDir); + snapshotLinkDir, originDataDir); + } catch (Exception e) { + throw new BackendException("Failed to move %s to %s", e, + snapshotLinkDir, originDataDir); } } // Reload rocksdb instance @@ -731,18 +740,6 @@ private final void closeSessions() { } } - private Session findMatchedSession(File snapshotFile) { - String fileName = snapshotFile.getName(); - for (Session session : this.session()) { - String md5 = GZipUtil.md5(session.dataPath()); - if (fileName.equals(md5)) { - return session; - } - } - throw new BackendException("Can't find matched session for " + - "snapshot file %s", snapshotFile); - } - private final List session() { this.checkOpened(); @@ -828,6 +825,29 @@ private static boolean existsOtherKeyspace(String dataPath) { return false; } + public static void createCheckpoint(RocksDB rocksdb, String targetPath) { + // https://github.com/facebook/rocksdb/wiki/Checkpoints + try (Checkpoint checkpoint = Checkpoint.create(rocksdb)) { + String tempPath = targetPath + "_temp"; + File tempFile = new File(tempPath); + FileUtils.deleteDirectory(tempFile); + LOG.debug("Deleted temp directory {}", tempFile); + + FileUtils.forceMkdir(tempFile.getParentFile()); + checkpoint.createCheckpoint(tempPath); + File snapshotFile = new File(targetPath); + FileUtils.deleteDirectory(snapshotFile); + LOG.debug("Deleted stale directory {}", snapshotFile); + if (!tempFile.renameTo(snapshotFile)) { + throw new IOException(String.format("Failed to rename %s to %s", + tempFile, snapshotFile)); + } + } catch (Exception e) { + throw new BackendException("Failed to create checkpoint at path %s", + e, targetPath); + } + } + /***************************** Store defines *****************************/ public static class RocksDBSchemaStore extends RocksDBStore { diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java index 1e5cce927e..f2a4db506f 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java @@ -33,6 +33,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.rocksdb.EnvOptions; import org.rocksdb.Options; +import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.SstFileWriter; @@ -134,7 +135,7 @@ public boolean existsTable(String table) { @Override public List property(String property) { - throw new NotSupportException("RocksDBSstStore property()"); + throw new UnsupportedOperationException("RocksDBSstStore property()"); } @Override @@ -143,6 +144,11 @@ public RocksDBSessions copy(HugeConfig config, return new RocksDBSstSessions(config, database, store, this); } + @Override + public RocksDB createSnapshotRocksDB(String snapshotPath) { + throw new UnsupportedOperationException("createSnapshotRocksDB"); + } + @Override public void createSnapshot(String snapshotPath) { throw new UnsupportedOperationException("createSnapshot"); diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java index af6609c8db..adf12ce989 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java @@ -5491,8 +5491,8 @@ public void testRemoveEdgesOfSuperVertex() { // Clear all try { graph.truncateBackend(); - } catch (UnsupportedOperationException ignored) { - // pass + } catch (UnsupportedOperationException e) { + LOG.warn("Failed to create snapshot", e); } }