diff --git a/hugegraph-api/pom.xml b/hugegraph-api/pom.xml
index b687b64ac2..ee70e207aa 100644
--- a/hugegraph-api/pom.xml
+++ b/hugegraph-api/pom.xml
@@ -171,7 +171,7 @@
- 0.59.0.0
+ 0.60.0.0
diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/profile/GraphsAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/profile/GraphsAPI.java
index 970edd3420..76fd64e7b7 100644
--- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/profile/GraphsAPI.java
+++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/profile/GraphsAPI.java
@@ -138,6 +138,34 @@ public void clear(@Context GraphManager manager,
g.truncateBackend();
}
+ @PUT
+ @Timed
+ @Path("{name}/snapshot_create")
+ @Produces(APPLICATION_JSON_WITH_CHARSET)
+ @RolesAllowed({"admin", "$owner=$name"})
+ public Object createSnapshot(@Context GraphManager manager,
+ @PathParam("name") String name) {
+ LOG.debug("Create snapshot for graph '{}'", name);
+
+ HugeGraph g = graph(manager, name);
+ g.createSnapshot();
+ return ImmutableMap.of(name, "snapshot_created");
+ }
+
+ @PUT
+ @Timed
+ @Path("{name}/snapshot_resume")
+ @Produces(APPLICATION_JSON_WITH_CHARSET)
+ @RolesAllowed({"admin", "$owner=$name"})
+ public Object resumeSnapshot(@Context GraphManager manager,
+ @PathParam("name") String name) {
+ LOG.debug("Resume snapshot for graph '{}'", name);
+
+ HugeGraph g = graph(manager, name);
+ g.resumeSnapshot();
+ return ImmutableMap.of(name, "snapshot_resumed");
+ }
+
@PUT
@Timed
@Path("{name}/mode")
diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java
index bb495d7619..34fcc6d495 100644
--- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java
+++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java
@@ -700,6 +700,18 @@ public void truncateBackend() {
}
}
+ @Override
+ public void createSnapshot() {
+ this.verifyPermission(HugePermission.WRITE, ResourceType.STATUS);
+ this.hugegraph.createSnapshot();
+ }
+
+ @Override
+ public void resumeSnapshot() {
+ this.verifyPermission(HugePermission.WRITE, ResourceType.STATUS);
+ this.hugegraph.resumeSnapshot();
+ }
+
private void verifyAdminPermission() {
verifyPermission(HugePermission.ANY, ResourceType.ROOT);
}
diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java
index 4062ef3534..8b8babce64 100644
--- a/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java
+++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java
@@ -109,10 +109,11 @@ public final class ApiVersion {
* [0.58] Issue-1173: Supports customized kout/kneighbor,
* multi-node-shortest-path, jaccard-similar and template-paths
* [0.59] Issue-1333: Support graph read mode for olap property
+ * [0.60] Issue-1392: Support create and resume snapshot
*/
// The second parameter of Version.of() is for IDE running without JAR
- public static final Version VERSION = Version.of(ApiVersion.class, "0.59");
+ public static final Version VERSION = Version.of(ApiVersion.class, "0.60");
public static final void check() {
// Check version of hugegraph-core. Firstly do check from version 0.3
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java
index f9dd690144..ed26bbb5d0 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java
@@ -153,6 +153,9 @@ public interface HugeGraph extends Graph {
public void clearBackend();
public void truncateBackend();
+ public void createSnapshot();
+ public void resumeSnapshot();
+
@Override
public HugeFeatures features();
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java
index b6d4821119..3a16deb59e 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java
@@ -360,7 +360,7 @@ public void truncateBackend() {
* When restarting, load the snapshot first and then read backend,
* will not encounter such an intermediate state.
*/
- this.storeProvider.writeSnapshot();
+ this.storeProvider.createSnapshot();
} finally {
LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
}
@@ -368,6 +368,28 @@ public void truncateBackend() {
LOG.info("Graph '{}' has been truncated", this.name);
}
+ @Override
+ public void createSnapshot() {
+ LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
+ try {
+ this.storeProvider.createSnapshot();
+ } finally {
+ LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
+ }
+ LOG.info("Graph '{}' has created snapshot", this.name);
+ }
+
+ @Override
+ public void resumeSnapshot() {
+ LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
+ try {
+ this.storeProvider.resumeSnapshot();
+ } finally {
+ LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
+ }
+ LOG.info("Graph '{}' has resumed from snapshot", this.name);
+ }
+
private SchemaTransaction openSchemaTransaction() throws HugeException {
this.checkGraphNotClosed();
try {
@@ -1424,7 +1446,7 @@ public void invalid2(HugeType type, Object[] ids) {
@Override
public void clear(HugeType type) {
- this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, type, null);
+ this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, type);
}
@Override
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java
index 7f5a44920c..fc35b7fb84 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java
@@ -27,6 +27,7 @@
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.BackendException;
+import com.baidu.hugegraph.backend.store.raft.StoreSnapshotFile;
import com.baidu.hugegraph.event.EventHub;
import com.baidu.hugegraph.event.EventListener;
import com.baidu.hugegraph.util.E;
@@ -151,13 +152,19 @@ public void initSystemInfo(HugeGraph graph) {
}
@Override
- public void writeSnapshot() {
- // TODO: to be implemented
+ public void createSnapshot() {
+ String snapshotPrefix = StoreSnapshotFile.SNAPSHOT_DIR;
+ for (BackendStore store : this.stores.values()) {
+ store.createSnapshot(snapshotPrefix);
+ }
}
@Override
- public void readSnapshot() {
- // TODO: to be implemented
+ public void resumeSnapshot() {
+ String snapshotPrefix = StoreSnapshotFile.SNAPSHOT_DIR;
+ for (BackendStore store : this.stores.values()) {
+ store.resumeSnapshot(snapshotPrefix, true);
+ }
}
@Override
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java
index 49b83a3e36..e67c6d7beb 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java
@@ -29,6 +29,10 @@ public default boolean supportsSharedStorage() {
return true;
}
+ public default boolean supportsSnapshot() {
+ return false;
+ }
+
public boolean supportsScanToken();
public boolean supportsScanKeyPrefix();
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java
index 209ed10cf7..7c55da1a3f 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java
@@ -20,6 +20,7 @@
package com.baidu.hugegraph.backend.store;
import java.util.Iterator;
+import java.util.Set;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.IdGenerator;
@@ -116,12 +117,13 @@ public default void setCounterLowest(HugeType type, long lowest) {
// Get current counter for a specific type
public long getCounter(HugeType type);
- public default void writeSnapshot(String snapshotPath) {
- throw new UnsupportedOperationException("writeSnapshot");
+ public default Set createSnapshot(String snapshotDir) {
+ throw new UnsupportedOperationException("createSnapshot");
}
- public default void readSnapshot(String snapshotPath) {
- throw new UnsupportedOperationException("readSnapshot");
+ public default void resumeSnapshot(String snapshotDir,
+ boolean deleteSnapshot) {
+ throw new UnsupportedOperationException("resumeSnapshot");
}
static enum TxState {
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java
index e7f815b64e..12058d7b73 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java
@@ -54,9 +54,9 @@ public interface BackendStoreProvider {
public void initSystemInfo(HugeGraph graph);
- public void writeSnapshot();
+ public void createSnapshot();
- public void readSnapshot();
+ public void resumeSnapshot();
public void listen(EventListener listener);
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java
index 0e465da9ec..cabbd14d7f 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java
@@ -199,7 +199,7 @@ public void initSystemInfo(HugeGraph graph) {
}
@Override
- public void writeSnapshot() {
+ public void createSnapshot() {
StoreCommand command = new StoreCommand(StoreType.ALL,
StoreAction.SNAPSHOT, null);
StoreClosure closure = new StoreClosure(command);
@@ -208,8 +208,9 @@ public void writeSnapshot() {
}
@Override
- public void readSnapshot() {
- // How to read snapshot by jraft explicity?
+ public void resumeSnapshot() {
+ // Jraft doesn't expose API to load snapshot
+ throw new UnsupportedOperationException("resumeSnapshot");
}
@Override
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java
index 5b2815fd08..e4c7f44385 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java
@@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -43,6 +44,7 @@
import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.backend.cache.Cache;
+import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
@@ -259,7 +261,7 @@ public void clearCache() {
this.notifyCache(Cache.ACTION_CLEAR, HugeType.VERTEX, null);
}
- protected void notifyCache(String action, HugeType type, Object id) {
+ protected void notifyCache(String action, HugeType type, List ids) {
EventHub eventHub;
if (type.isGraph()) {
eventHub = this.params.graphEventHub();
@@ -270,7 +272,15 @@ protected void notifyCache(String action, HugeType type, Object id) {
}
try {
// How to avoid update cache from server info
- eventHub.notify(Events.CACHE, action, type, id);
+ if (ids == null) {
+ eventHub.call(Events.CACHE, action, type);
+ } else {
+ if (ids.size() == 1) {
+ eventHub.call(Events.CACHE, action, type, ids.get(0));
+ } else {
+ eventHub.call(Events.CACHE, action, type, ids.toArray());
+ }
+ }
} catch (RejectedExecutionException e) {
LOG.warn("Can't update cache due to EventHub is too busy");
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java
index ff1a9f4a21..af9e35c4ee 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java
@@ -24,7 +24,8 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
-import java.util.concurrent.CompletableFuture;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.zip.Checksum;
@@ -45,8 +46,10 @@ public class StoreSnapshotFile {
private static final Logger LOG = Log.logger(StoreSnapshotFile.class);
- private static final String SNAPSHOT_DIR = "ss";
- private static final String SNAPSHOT_ARCHIVE = "ss.zip";
+ public static final String SNAPSHOT_DIR = "snapshot";
+ private static final String ARCHIVE_FORMAT = ".tar";
+ private static final String SNAPSHOT_ARCHIVE = SNAPSHOT_DIR + ARCHIVE_FORMAT;
+ private static final String MANIFEST = "manifest";
private final RaftBackendStore[] stores;
@@ -56,27 +59,20 @@ public StoreSnapshotFile(RaftBackendStore[] stores) {
public void save(SnapshotWriter writer, Closure done,
ExecutorService executor) {
- String writerPath = writer.getPath();
- String snapshotPath = Paths.get(writerPath, SNAPSHOT_DIR).toString();
try {
- this.doSnapshotSave(snapshotPath).whenComplete((metaBuilder, t) -> {
- if (t == null) {
- executor.execute(() -> compressSnapshot(writer, metaBuilder,
- done));
- } else {
- LOG.error("Failed to save snapshot, path={}, files={}",
- writerPath, writer.listFiles(), t);
- done.run(new Status(RaftError.EIO,
- "Failed to save snapshot at %s, error is %s",
- writerPath, t.getMessage()));
- }
+ // Write snapshot to real directory
+ Set snapshotDirs = this.doSnapshotSave();
+ executor.execute(() -> {
+ String jraftSnapshotPath = this.writeManifest(writer,
+ snapshotDirs,
+ done);
+ this.compressJraftSnapshotDir(writer, jraftSnapshotPath, done);
});
} catch (Throwable t) {
- LOG.error("Failed to save snapshot, path={}, files={}, {}.",
- writerPath, writer.listFiles(), t);
+ LOG.error("Failed to save snapshot", t);
done.run(new Status(RaftError.EIO,
- "Failed to save snapshot at %s, error is %s",
- writerPath, t.getMessage()));
+ "Failed to save snapshot, error is %s",
+ t.getMessage()));
}
}
@@ -84,14 +80,16 @@ public boolean load(SnapshotReader reader) {
LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(SNAPSHOT_ARCHIVE);
String readerPath = reader.getPath();
if (meta == null) {
- LOG.error("Can't find snapshot archive file, path={}.", readerPath);
+ LOG.error("Can't find snapshot archive file, path={}", readerPath);
return false;
}
- String snapshotPath = Paths.get(readerPath, SNAPSHOT_DIR).toString();
+ String jraftSnapshotPath = Paths.get(readerPath, SNAPSHOT_DIR)
+ .toString();
try {
+ // Decompress manifest and data directory
this.decompressSnapshot(readerPath, meta);
- this.doSnapshotLoad(snapshotPath);
- File tmp = new File(snapshotPath);
+ this.doSnapshotLoad();
+ File tmp = new File(jraftSnapshotPath);
// Delete the decompressed temporary file. If the deletion fails
// (although it is a small probability event), it may affect the
// next snapshot decompression result. Therefore, the safest way
@@ -102,39 +100,85 @@ public boolean load(SnapshotReader reader) {
}
return true;
} catch (Throwable t) {
- LOG.error("Failed to load snapshot, path={}, file list={}, {}.",
- readerPath, reader.listFiles(), t);
+ LOG.error("Failed to load snapshot", t);
return false;
}
}
- private CompletableFuture doSnapshotSave(
- String snapshotPath) {
+ private Set doSnapshotSave() {
+ Set snapshotDirs = new HashSet<>();
for (RaftBackendStore store : this.stores) {
- String parentPath = Paths.get(snapshotPath, store.store())
- .toString();
- store.originStore().writeSnapshot(parentPath);
+ snapshotDirs.addAll(store.originStore().createSnapshot(SNAPSHOT_DIR));
}
- return CompletableFuture.completedFuture(LocalFileMeta.newBuilder());
+ LOG.info("All snapshot dirs: {}", snapshotDirs);
+ return snapshotDirs;
}
- private void doSnapshotLoad(String snapshotPath) {
+ private void doSnapshotLoad() {
for (RaftBackendStore store : this.stores) {
- String parentPath = Paths.get(snapshotPath, store.store())
- .toString();
- store.originStore().readSnapshot(parentPath);
+ store.originStore().resumeSnapshot(SNAPSHOT_DIR, false);
+ }
+ }
+
+ private Set compressSnapshotDir(Set snapshotDirs,
+ Closure done) {
+ // Compress all backend snapshot dir
+ Set tarSnapshotFiles = new HashSet<>();
+ for (String snapshotDir : snapshotDirs) {
+ String outputFile = snapshotDir + ARCHIVE_FORMAT;
+ try {
+ CompressUtil.compressTar(snapshotDir, outputFile, new CRC64());
+ } catch (IOException e) {
+ done.run(new Status(RaftError.EIO,
+ "Failed to compress backend snapshot dir " +
+ snapshotDir));
+ }
+ tarSnapshotFiles.add(outputFile);
+ }
+ return tarSnapshotFiles;
+ }
+
+ private void deleteSnapshotDir(Set snapshotDirs,
+ Closure done) {
+ // Delete all backend snapshot dir
+ for (String snapshotDir : snapshotDirs) {
+ try {
+ FileUtils.deleteDirectory(new File(snapshotDir));
+ } catch (IOException e) {
+ done.run(new Status(RaftError.EIO,
+ "Failed to delete backend snapshot dir " +
+ snapshotDir));
+ }
+ }
+ }
+
+ private String writeManifest(SnapshotWriter writer,
+ Set snapshotFiles,
+ Closure done) {
+ String writerPath = writer.getPath();
+ // Write all backend compressed snapshot file path to manifest
+ String jraftSnapshotPath = Paths.get(writerPath, SNAPSHOT_DIR)
+ .toString();
+ File snapshotManifestFile = new File(jraftSnapshotPath, MANIFEST);
+ try {
+ FileUtils.writeLines(snapshotManifestFile, snapshotFiles);
+ } catch (IOException e) {
+ done.run(new Status(RaftError.EIO,
+ "Failed to write backend snapshot file path " +
+ "to manifest"));
}
+ return jraftSnapshotPath;
}
- private void compressSnapshot(SnapshotWriter writer,
- LocalFileMeta.Builder metaBuilder,
- Closure done) {
+ private void compressJraftSnapshotDir(SnapshotWriter writer,
+ String jraftSnapshotPath,
+ Closure done) {
String writerPath = writer.getPath();
String outputFile = Paths.get(writerPath, SNAPSHOT_ARCHIVE).toString();
try {
+ LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder();
Checksum checksum = new CRC64();
- CompressUtil.compressTar(writerPath, SNAPSHOT_DIR,
- outputFile, checksum);
+ CompressUtil.compressTar(jraftSnapshotPath, outputFile, checksum);
metaBuilder.setChecksum(Long.toHexString(checksum.getValue()));
if (writer.addFile(SNAPSHOT_ARCHIVE, metaBuilder.build())) {
done.run(Status.OK());
@@ -154,9 +198,9 @@ private void compressSnapshot(SnapshotWriter writer,
private void decompressSnapshot(String readerPath, LocalFileMeta meta)
throws IOException {
- String sourceFile = Paths.get(readerPath, SNAPSHOT_ARCHIVE).toString();
+ String archiveFile = Paths.get(readerPath, SNAPSHOT_ARCHIVE).toString();
Checksum checksum = new CRC64();
- CompressUtil.decompressTar(sourceFile, readerPath, checksum);
+ CompressUtil.decompressTar(archiveFile, readerPath, checksum);
if (meta.hasChecksum()) {
E.checkArgument(meta.getChecksum().equals(
Long.toHexString(checksum.getValue())),
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java
index a5697209a6..2b8981015c 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java
@@ -40,7 +40,6 @@
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.serializer.BytesBuffer;
import com.baidu.hugegraph.backend.store.BackendAction;
-import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendMutation;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.raft.RaftBackendStore.IncrCounter;
@@ -86,21 +85,13 @@ private void updateCacheIfNeeded(BackendMutation mutation,
return;
}
for (HugeType type : mutation.types()) {
- if (type.isSchema()) {
- java.util.Iterator it = mutation.mutation(type);
- while (it.hasNext()) {
- BackendEntry entry = it.next().entry();
- this.context.notifyCache(Cache.ACTION_INVALID, type,
- entry.originId());
- }
- } else if (type.isGraph()) {
- List ids = new ArrayList<>((int) Query.COMMIT_BATCH);
+ List ids = new ArrayList<>((int) Query.COMMIT_BATCH);
+ if (type.isSchema() || type.isGraph()) {
java.util.Iterator it = mutation.mutation(type);
while (it.hasNext()) {
ids.add(it.next().entry().originId());
}
- this.context.notifyCache(Cache.ACTION_INVALID, type,
- ids.toArray());
+ this.context.notifyCache(Cache.ACTION_INVALID, type, ids);
} else {
// Ignore other types due to not cached
}
@@ -229,8 +220,8 @@ public boolean onSnapshotLoad(SnapshotReader reader) {
@Override
public void onLeaderStart(long term) {
- LOG.info("The node {} become to leader", this.node().nodeId());
- this.node().onLeaderInfoChange(this.node().nodeId(), true);
+ LOG.info("The node {} become to leader", this.context.endpoint());
+ this.node().onLeaderInfoChange(this.context.endpoint(), true);
super.onLeaderStart(term);
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java
index a97f385bea..de90486ecb 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java
@@ -95,6 +95,13 @@ public class HugeSecurityManager extends SecurityManager {
ImmutableSet.of("execute")
);
+ private static final Map> BACKEND_SNAPSHOT = ImmutableMap.of(
+ "com.baidu.hugegraph.backend.store.AbstractBackendStoreProvider",
+ ImmutableSet.of("createSnapshot", "resumeSnapshot"),
+ "com.baidu.hugegraph.backend.store.raft.RaftBackendStoreProvider",
+ ImmutableSet.of("createSnapshot", "resumeSnapshot")
+ );
+
private static final Set HBASE_CLASSES = ImmutableSet.of(
// Fixed #758
"com.baidu.hugegraph.backend.store.hbase.HbaseStore",
@@ -110,7 +117,8 @@ public class HugeSecurityManager extends SecurityManager {
);
private static final Set SOFA_RPC_CLASSES = ImmutableSet.of(
- "com.alipay.sofa.rpc.tracer.sofatracer.RpcSofaTracer"
+ "com.alipay.sofa.rpc.tracer.sofatracer.RpcSofaTracer",
+ "com.alipay.sofa.rpc.client.AbstractCluster"
);
@Override
@@ -204,7 +212,8 @@ public void checkRead(FileDescriptor fd) {
public void checkRead(String file) {
if (callFromGremlin() && !callFromCaffeine() &&
!readGroovyInCurrentDir(file) && !callFromBackendHbase() &&
- !callFromRaft() && !callFromSofaRpc()) {
+ !callFromSnapshot() && !callFromRaft() &&
+ !callFromSofaRpc()) {
throw newSecurityException(
"Not allowed to read file via Gremlin: %s", file);
}
@@ -231,7 +240,8 @@ public void checkWrite(FileDescriptor fd) {
@Override
public void checkWrite(String file) {
- if (callFromGremlin() && !callFromRaft() && !callFromSofaRpc()) {
+ if (callFromGremlin() && !callFromSnapshot() &&
+ !callFromRaft() && !callFromSofaRpc()) {
throw newSecurityException("Not allowed to write file via Gremlin");
}
super.checkWrite(file);
@@ -239,7 +249,7 @@ public void checkWrite(String file) {
@Override
public void checkDelete(String file) {
- if (callFromGremlin()) {
+ if (callFromGremlin() && !callFromSnapshot()) {
throw newSecurityException(
"Not allowed to delete file via Gremlin");
}
@@ -322,7 +332,8 @@ public void checkPropertiesAccess() {
public void checkPropertyAccess(String key) {
if (!callFromAcceptClassLoaders() && callFromGremlin() &&
!WHITE_SYSTEM_PROPERTYS.contains(key) && !callFromBackendHbase() &&
- !callFromRaft() && !callFromSofaRpc()) {
+ !callFromSnapshot() && !callFromRaft() &&
+ !callFromSofaRpc()) {
throw newSecurityException(
"Not allowed to access system property(%s) via Gremlin", key);
}
@@ -442,6 +453,10 @@ private static boolean callFromBackendHbase() {
return callFromWorkerWithClass(HBASE_CLASSES);
}
+ private static boolean callFromSnapshot() {
+ return callFromMethods(BACKEND_SNAPSHOT);
+ }
+
private static boolean callFromRaft() {
return callFromWorkerWithClass(RAFT_CLASSES);
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/util/CompressUtil.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/util/CompressUtil.java
index 032f932ae7..c1903f069a 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/util/CompressUtil.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/util/CompressUtil.java
@@ -62,6 +62,12 @@ public final class CompressUtil {
public static void compressTar(String rootDir, String sourceDir,
String outputFile, Checksum checksum)
throws IOException {
+ Path source = Paths.get(rootDir, sourceDir);
+ compressTar(source.toString(), outputFile, checksum);
+ }
+
+ public static void compressTar(String inputDir, String outputFile,
+ Checksum checksum) throws IOException {
LZ4Factory factory = LZ4Factory.fastestInstance();
LZ4Compressor compressor = factory.fastCompressor();
int blockSize = RaftSharedContext.BLOCK_SIZE;
@@ -72,7 +78,7 @@ public static void compressTar(String rootDir, String sourceDir,
blockSize,
compressor);
TarArchiveOutputStream tos = new TarArchiveOutputStream(lz4os)) {
- Path source = Paths.get(rootDir, sourceDir);
+ Path source = Paths.get(inputDir);
CompressUtil.tarDir(source, tos);
tos.flush();
fos.getFD().sync();
diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java
index cdbb00a8aa..a133845a9d 100644
--- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java
+++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java
@@ -28,6 +28,11 @@ public boolean supportsSharedStorage() {
return false;
}
+ @Override
+ public boolean supportsSnapshot() {
+ return true;
+ }
+
@Override
public boolean supportsScanToken() {
return false;
diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java
index 1889344ed5..e92f358a4d 100644
--- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java
+++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java
@@ -25,6 +25,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.RocksDBException;
+import com.alipay.sofa.jraft.storage.snapshot.remote.Session;
import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
import com.baidu.hugegraph.backend.store.BackendSession.AbstractBackendSession;
import com.baidu.hugegraph.backend.store.BackendSessionPool;
@@ -49,7 +50,14 @@ public abstract RocksDBSessions copy(HugeConfig config,
public abstract void createSnapshot(String parentPath);
- public abstract void reload() throws RocksDBException;
+ public abstract void resumeSnapshot(String snapshotPath);
+
+ public abstract String buildSnapshotPath(String snapshotPrefix);
+
+ public abstract String hardLinkSnapshot(String snapshotPath)
+ throws RocksDBException;
+
+ public abstract void reloadRocksDB() throws RocksDBException;
public abstract void forceCloseRocksDB();
diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
index 3bbfd9237d..3e98cda349 100644
--- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
+++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
@@ -59,6 +59,7 @@
import org.rocksdb.SstFileManager;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.serializer.BinarySerializer;
@@ -69,20 +70,19 @@
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.util.Bytes;
import com.baidu.hugegraph.util.E;
-import com.baidu.hugegraph.util.GZipUtil;
+import com.baidu.hugegraph.util.Log;
import com.baidu.hugegraph.util.StringEncoding;
import com.google.common.collect.ImmutableList;
public class RocksDBStdSessions extends RocksDBSessions {
+ private static final Logger LOG = Log.logger(RocksDBStdSessions.class);
+
private final HugeConfig config;
private final String dataPath;
private final String walPath;
- private volatile RocksDB rocksdb;
- private final SstFileManager sstFileManager;
-
- private final Map cfs;
+ private volatile OpenedRocksDB rocksdb;
private final AtomicInteger refCount;
public RocksDBStdSessions(HugeConfig config, String database, String store,
@@ -92,22 +92,8 @@ public RocksDBStdSessions(HugeConfig config, String database, String store,
this.config = config;
this.dataPath = dataPath;
this.walPath = walPath;
- // Init options
- Options options = new Options();
- RocksDBStdSessions.initOptions(config, options, options,
- options, options);
- options.setWalDir(walPath);
-
- this.sstFileManager = new SstFileManager(Env.getDefault());
- options.setSstFileManager(this.sstFileManager);
-
- /*
- * Open RocksDB at the first time
- * Don't merge old CFs, we expect a clear DB when using this one
- */
- this.rocksdb = RocksDB.open(options, dataPath);
-
- this.cfs = new ConcurrentHashMap<>();
+ this.rocksdb = RocksDBStdSessions.openRocksDB(config, dataPath,
+ walPath);
this.refCount = new AtomicInteger(1);
}
@@ -118,43 +104,11 @@ public RocksDBStdSessions(HugeConfig config, String database, String store,
this.config = config;
this.dataPath = dataPath;
this.walPath = walPath;
- // Old CFs should always be opened
- Set mergedCFs = this.mergeOldCFs(dataPath, cfNames);
- List cfs = ImmutableList.copyOf(mergedCFs);
-
- // Init CFs options
- List cfds = new ArrayList<>(cfs.size());
- for (String cf : cfs) {
- ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf));
- ColumnFamilyOptions options = cfd.getOptions();
- RocksDBStdSessions.initOptions(config, null, null,
- options, options);
- cfds.add(cfd);
- }
-
- // Init DB options
- DBOptions options = new DBOptions();
- RocksDBStdSessions.initOptions(config, options, options, null, null);
- options.setWalDir(walPath);
-
- this.sstFileManager = new SstFileManager(Env.getDefault());
- options.setSstFileManager(this.sstFileManager);
-
- // Open RocksDB with CFs
- List cfhs = new ArrayList<>();
- this.rocksdb = RocksDB.open(options, dataPath, cfds, cfhs);
- E.checkState(cfhs.size() == cfs.size(),
- "Expect same size of cf-handles and cf-names");
-
- // Collect CF Handles
- this.cfs = new ConcurrentHashMap<>();
- for (int i = 0; i < cfs.size(); i++) {
- this.cfs.put(cfs.get(i), new CFHandle(cfhs.get(i)));
- }
-
+ this.rocksdb = RocksDBStdSessions.openRocksDB(config, cfNames,
+ dataPath, walPath);
this.refCount = new AtomicInteger(1);
- ingestExternalFile();
+ this.ingestExternalFile();
}
private RocksDBStdSessions(HugeConfig config, String database, String store,
@@ -164,10 +118,7 @@ private RocksDBStdSessions(HugeConfig config, String database, String store,
this.dataPath = origin.dataPath;
this.walPath = origin.walPath;
this.rocksdb = origin.rocksdb;
- this.sstFileManager = origin.sstFileManager;
- this.cfs = origin.cfs;
this.refCount = origin.refCount;
-
this.refCount.incrementAndGet();
}
@@ -183,7 +134,7 @@ protected boolean opened() {
@Override
public Set openedTables() {
- return this.cfs.keySet();
+ return this.rocksdb.cfs();
}
@Override
@@ -193,7 +144,7 @@ public synchronized void createTable(String... tables)
List cfds = new ArrayList<>();
for (String table : tables) {
- if (this.cfs.containsKey(table)) {
+ if (this.rocksdb.existCf(table)) {
continue;
}
ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(
@@ -207,14 +158,14 @@ public synchronized void createTable(String... tables)
* To speed up the creation of tables, like truncate() for tinkerpop
* test, we call createColumnFamilies instead of createColumnFamily.
*/
- List cfhs = this.rocksdb.createColumnFamilies(cfds);
+ List cfhs = this.rocksdb().createColumnFamilies(cfds);
for (ColumnFamilyHandle cfh : cfhs) {
String table = decode(cfh.getName());
- this.cfs.put(table, new CFHandle(cfh));
+ this.rocksdb.addCf(table, new CFHandle(cfh));
}
- ingestExternalFile();
+ this.ingestExternalFile();
}
@Override
@@ -229,7 +180,7 @@ public synchronized void dropTable(String... tables)
*/
List cfhs = new ArrayList<>();
for (String table : tables) {
- CFHandle cfh = this.cfs.get(table);
+ CFHandle cfh = this.rocksdb.cf(table);
if (cfh == null) {
continue;
}
@@ -240,49 +191,32 @@ public synchronized void dropTable(String... tables)
* To speed up the creation of tables, like truncate() for tinkerpop
* test, we call dropColumnFamilies instead of dropColumnFamily.
*/
- this.rocksdb.dropColumnFamilies(cfhs);
+ this.rocksdb().dropColumnFamilies(cfhs);
for (String table : tables) {
- CFHandle cfh = this.cfs.get(table);
+ CFHandle cfh = this.rocksdb.cf(table);
if (cfh == null) {
continue;
}
cfh.destroy();
- this.cfs.remove(table);
+ this.rocksdb.removeCf(table);
}
}
@Override
- public void reload() throws RocksDBException {
+ public boolean existsTable(String table) {
+ return this.rocksdb.existCf(table);
+ }
+
+ @Override
+ public void reloadRocksDB() throws RocksDBException {
if (this.rocksdb.isOwningHandle()) {
this.rocksdb.close();
}
- this.cfs.values().forEach(CFHandle::destroy);
- // Init CFs options
- Set mergedCFs = this.mergeOldCFs(this.dataPath, new ArrayList<>(
- this.cfs.keySet()));
- List cfNames = ImmutableList.copyOf(mergedCFs);
-
- List cfds = new ArrayList<>(cfNames.size());
- for (String cf : cfNames) {
- ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf));
- ColumnFamilyOptions options = cfd.getOptions();
- RocksDBStdSessions.initOptions(this.config, null, null,
- options, options);
- cfds.add(cfd);
- }
- List cfhs = new ArrayList<>();
-
- // Init DB options
- DBOptions options = new DBOptions();
- RocksDBStdSessions.initOptions(this.config, options, options,
- null, null);
- options.setWalDir(this.walPath);
- options.setSstFileManager(this.sstFileManager);
- this.rocksdb = RocksDB.open(options, this.dataPath, cfds, cfhs);
- for (int i = 0; i < cfNames.size(); i++) {
- this.cfs.put(cfNames.get(i), new CFHandle(cfhs.get(i)));
- }
+ this.rocksdb = RocksDBStdSessions.openRocksDB(this.config,
+ ImmutableList.of(),
+ this.dataPath,
+ this.walPath);
}
@Override
@@ -290,22 +224,17 @@ public void forceCloseRocksDB() {
this.rocksdb().close();
}
- @Override
- public boolean existsTable(String table) {
- return this.cfs.containsKey(table);
- }
-
@Override
public List property(String property) {
try {
if (property.equals(RocksDBMetrics.DISK_USAGE)) {
- long size = this.sstFileManager.getTotalSize();
+ long size = this.rocksdb.sstFileManager.getTotalSize();
return ImmutableList.of(String.valueOf(size));
}
List values = new ArrayList<>();
for (String cf : this.openedTables()) {
- try (CFHandle cfh = cf(cf)) {
- values.add(rocksdb().getProperty(cfh.get(), property));
+ try (CFHandle cfh = this.cf(cf)) {
+ values.add(this.rocksdb().getProperty(cfh.get(), property));
}
}
return values;
@@ -321,29 +250,67 @@ public RocksDBSessions copy(HugeConfig config,
}
@Override
- public void createSnapshot(String parentPath) {
- String md5 = GZipUtil.md5(this.dataPath);
- String snapshotPath = Paths.get(parentPath, md5).toString();
- // https://github.com/facebook/rocksdb/wiki/Checkpoints
- try (Checkpoint checkpoint = Checkpoint.create(this.rocksdb)) {
- String tempPath = snapshotPath + "_temp";
- File tempFile = new File(tempPath);
- FileUtils.deleteDirectory(tempFile);
+ public void createSnapshot(String snapshotPath) {
+ RocksDBStdSessions.createCheckpoint(this.rocksdb(), snapshotPath);
+ }
- FileUtils.forceMkdir(tempFile.getParentFile());
- checkpoint.createCheckpoint(tempPath);
- File snapshotFile = new File(snapshotPath);
- FileUtils.deleteDirectory(snapshotFile);
- if (!tempFile.renameTo(snapshotFile)) {
- throw new IOException(String.format("Failed to rename %s to %s",
- tempFile, snapshotFile));
+ @Override
+ public void resumeSnapshot(String snapshotPath) {
+ File originDataDir = new File(this.dataPath);
+ File snapshotDir = new File(snapshotPath);
+ try {
+ /*
+ * Close current instance first
+ * NOTE: must close rocksdb instance before deleting file directory,
+ * if close after copying the snapshot directory to origin position,
+ * it may produce dirty data.
+ */
+ this.forceCloseRocksDB();
+ // Delete origin data directory
+ if (originDataDir.exists()) {
+ LOG.info("Delete origin data directory {}", originDataDir);
+ FileUtils.deleteDirectory(originDataDir);
}
+ // Move snapshot directory to origin data directory
+ FileUtils.moveDirectory(snapshotDir, originDataDir);
+ LOG.info("Move snapshot directory {} to {}",
+ snapshotDir, originDataDir);
+ // Reload rocksdb instance
+ this.reloadRocksDB();
} catch (Exception e) {
- throw new BackendException("Failed to write snapshot at path %s",
- e, snapshotPath);
+ throw new BackendException("Failed to resume snapshot '%s' to' %s'",
+ e, snapshotDir, this.dataPath);
}
}
+ @Override
+ public String buildSnapshotPath(String snapshotPrefix) {
+ // Like: parent_path/rocksdb-data/*, * can be g,m,s
+ Path originDataPath = Paths.get(this.dataPath);
+ Path parentParentPath = originDataPath.getParent().getParent();
+ // Like: rocksdb-data/*
+ Path pureDataPath = parentParentPath.relativize(originDataPath);
+ // Like: parent_path/snapshot_rocksdb-data/*
+ Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" +
+ pureDataPath);
+ E.checkArgument(snapshotPath.toFile().exists(),
+ "The snapshot path '%s' doesn't exist",
+ snapshotPath);
+ return snapshotPath.toString();
+ }
+
+ @Override
+ public String hardLinkSnapshot(String snapshotPath) throws RocksDBException {
+ String snapshotLinkPath = this.dataPath + "_temp";
+ try (RocksDB rocksdb = openRocksDB(this.config, ImmutableList.of(),
+ snapshotPath, null).rocksdb) {
+ RocksDBStdSessions.createCheckpoint(rocksdb, snapshotLinkPath);
+ }
+ LOG.debug("The snapshot {} has been hard linked to {}",
+ snapshotPath, snapshotLinkPath);
+ return snapshotLinkPath;
+ }
+
@Override
public final Session session() {
return (Session) super.getOrNewSession();
@@ -364,12 +331,6 @@ protected synchronized void doClose() {
return;
}
assert this.refCount.get() == 0;
-
- for (CFHandle cf : this.cfs.values()) {
- cf.close();
- }
- this.cfs.clear();
-
this.rocksdb.close();
}
@@ -380,42 +341,128 @@ private void checkValid() {
private RocksDB rocksdb() {
this.checkValid();
- return this.rocksdb;
+ return this.rocksdb.rocksdb;
}
- private CFHandle cf(String cf) {
- CFHandle cfh = this.cfs.get(cf);
+ private CFHandle cf(String cfName) {
+ CFHandle cfh = this.rocksdb.cf(cfName);
if (cfh == null) {
- throw new BackendException("Table '%s' is not opened", cf);
+ throw new BackendException("Table '%s' is not opened", cfName);
}
cfh.open();
return cfh;
}
- private Set mergeOldCFs(String path, List cfNames)
- throws RocksDBException {
- Set cfs = listCFs(path);
- cfs.addAll(cfNames);
- return cfs;
- }
-
private void ingestExternalFile() throws RocksDBException {
String directory = this.config().get(RocksDBOptions.SST_PATH);
if (directory == null || directory.isEmpty()) {
return;
}
- RocksDBIngester ingester = new RocksDBIngester(this.rocksdb);
+ RocksDBIngester ingester = new RocksDBIngester(this.rocksdb());
// Ingest all *.sst files in `directory`
- for (String cf : this.cfs.keySet()) {
+ for (String cf : this.rocksdb.cfs()) {
Path path = Paths.get(directory, cf);
if (path.toFile().isDirectory()) {
- try (CFHandle cfh = cf(cf)) {
+ try (CFHandle cfh = this.cf(cf)) {
ingester.ingest(path, cfh.get());
}
}
}
}
+ private static OpenedRocksDB openRocksDB(HugeConfig config,
+ String dataPath, String walPath)
+ throws RocksDBException {
+ // Init options
+ Options options = new Options();
+ RocksDBStdSessions.initOptions(config, options, options,
+ options, options);
+ options.setWalDir(walPath);
+ SstFileManager sstFileManager = new SstFileManager(Env.getDefault());
+ options.setSstFileManager(sstFileManager);
+ /*
+ * Open RocksDB at the first time
+ * Don't merge old CFs, we expect a clear DB when using this one
+ */
+ RocksDB rocksdb = RocksDB.open(options, dataPath);
+ Map cfs = new ConcurrentHashMap<>();
+ return new OpenedRocksDB(rocksdb, cfs, sstFileManager);
+ }
+
+ private static OpenedRocksDB openRocksDB(HugeConfig config,
+ List cfNames,
+ String dataPath, String walPath)
+ throws RocksDBException {
+ // Old CFs should always be opened
+ Set mergedCFs = RocksDBStdSessions.mergeOldCFs(dataPath,
+ cfNames);
+ List cfs = ImmutableList.copyOf(mergedCFs);
+
+ // Init CFs options
+ List cfds = new ArrayList<>(cfs.size());
+ for (String cf : cfs) {
+ ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf));
+ ColumnFamilyOptions options = cfd.getOptions();
+ RocksDBStdSessions.initOptions(config, null, null,
+ options, options);
+ cfds.add(cfd);
+ }
+
+ // Init DB options
+ DBOptions options = new DBOptions();
+ RocksDBStdSessions.initOptions(config, options, options, null, null);
+ if (walPath != null) {
+ options.setWalDir(walPath);
+ }
+ SstFileManager sstFileManager = new SstFileManager(Env.getDefault());
+ options.setSstFileManager(sstFileManager);
+
+ // Open RocksDB with CFs
+ List cfhs = new ArrayList<>();
+ RocksDB rocksdb = RocksDB.open(options, dataPath, cfds, cfhs);
+
+ E.checkState(cfhs.size() == cfs.size(),
+ "Expect same size of cf-handles and cf-names");
+ // Collect CF Handles
+ Map cfHandles = new ConcurrentHashMap<>();
+ for (int i = 0; i < cfs.size(); i++) {
+ cfHandles.put(cfs.get(i), new CFHandle(cfhs.get(i)));
+ }
+ return new OpenedRocksDB(rocksdb, cfHandles, sstFileManager);
+ }
+
+ private static Set mergeOldCFs(String path, List cfNames)
+ throws RocksDBException {
+ Set cfs = listCFs(path);
+ cfs.addAll(cfNames);
+ return cfs;
+ }
+
+ private static void createCheckpoint(RocksDB rocksdb, String targetPath) {
+ Path parentPath = Paths.get(targetPath).getParent().getFileName();
+ assert parentPath.toString().startsWith("snapshot") : targetPath;
+ // https://github.com/facebook/rocksdb/wiki/Checkpoints
+ try (Checkpoint checkpoint = Checkpoint.create(rocksdb)) {
+ String tempPath = targetPath + "_temp";
+ File tempFile = new File(tempPath);
+ FileUtils.deleteDirectory(tempFile);
+ LOG.debug("Deleted temp directory {}", tempFile);
+
+ FileUtils.forceMkdir(tempFile.getParentFile());
+ checkpoint.createCheckpoint(tempPath);
+ File snapshotFile = new File(targetPath);
+ FileUtils.deleteDirectory(snapshotFile);
+ LOG.debug("Deleted stale directory {}", snapshotFile);
+ if (!tempFile.renameTo(snapshotFile)) {
+ throw new IOException(String.format("Failed to rename %s to %s",
+ tempFile, snapshotFile));
+ }
+ } catch (Exception e) {
+ throw new BackendException("Failed to create checkpoint at path %s",
+ e, targetPath);
+ }
+ }
+
public static Set listCFs(String path) throws RocksDBException {
Set cfs = new HashSet<>();
@@ -621,7 +668,57 @@ public static final String decode(byte[] bytes) {
return StringEncoding.decode(bytes);
}
- private class CFHandle implements Closeable {
+ private static class OpenedRocksDB {
+
+ private final RocksDB rocksdb;
+ private final Map cfHandles;
+ private final SstFileManager sstFileManager;
+
+ public OpenedRocksDB(RocksDB rocksdb, Map cfHandles,
+ SstFileManager sstFileManager) {
+ this.rocksdb = rocksdb;
+ this.cfHandles = cfHandles;
+ this.sstFileManager = sstFileManager;
+ }
+
+ public Set cfs() {
+ return this.cfHandles.keySet();
+ }
+
+ public CFHandle cf(String cfName) {
+ return this.cfHandles.get(cfName);
+ }
+
+ public void addCf(String cfName, CFHandle cfHandle) {
+ this.cfHandles.put(cfName, cfHandle);
+ }
+
+ public CFHandle removeCf(String cfName) {
+ return this.cfHandles.remove(cfName);
+ }
+
+ public boolean existCf(String cfName) {
+ return this.cfHandles.containsKey(cfName);
+ }
+
+ public boolean isOwningHandle() {
+ return this.rocksdb.isOwningHandle();
+ }
+
+ public void close() {
+ if (!this.isOwningHandle()) {
+ return;
+ }
+ for (CFHandle cf : this.cfHandles.values()) {
+ cf.close();
+ }
+ this.cfHandles.clear();
+
+ this.rocksdb.close();
+ }
+ }
+
+ private static class CFHandle implements Closeable {
private final ColumnFamilyHandle handle;
private final AtomicInteger refs;
@@ -1125,7 +1222,7 @@ private boolean filter(byte[] key) {
*/
assert this.keyEnd != null;
if (this.match(Session.SCAN_LTE_END)) {
- // Just compare the prefix, maybe there are excess tail
+ // Just compare the prefix, can be there are excess tail
key = Arrays.copyOfRange(key, 0, this.keyEnd.length);
return Bytes.compare(key, this.keyEnd) <= 0;
} else {
diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java
index 635e33024a..9cd31e530c 100644
--- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java
+++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java
@@ -19,8 +19,9 @@
package com.baidu.hugegraph.backend.store.rocksdb;
-import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
@@ -43,7 +44,6 @@
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
@@ -65,7 +65,6 @@
import com.baidu.hugegraph.util.Consumers;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.ExecutorUtil;
-import com.baidu.hugegraph.util.GZipUtil;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.baidu.hugegraph.util.Log;
import com.google.common.collect.ImmutableList;
@@ -116,7 +115,7 @@ private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
List dbs = new ArrayList<>();
dbs.add(this.sessions);
- dbs.addAll(tableDBMapping().values());
+ dbs.addAll(this.tableDBMapping().values());
RocksDBMetrics metrics = new RocksDBMetrics(dbs, session);
return metrics.getMetrics();
@@ -200,7 +199,7 @@ public synchronized void open(HugeConfig config) {
}));
}
}
- waitOpenFinish(futures, openPool);
+ this.waitOpenFinish(futures, openPool);
}
private void waitOpenFinish(List> futures,
@@ -349,7 +348,7 @@ protected Map tableDBMapping() {
Map tableDBMap = InsertionOrderUtil.newMap();
for (Entry e : this.tableDiskMapping.entrySet()) {
String table = this.table(e.getKey()).table();
- RocksDBSessions db = db(e.getValue());
+ RocksDBSessions db = this.db(e.getValue());
tableDBMap.put(table, db);
}
return tableDBMap;
@@ -539,7 +538,7 @@ public synchronized void truncate() {
this.clear(false);
this.init();
// clear write batch
- dbs.values().forEach(BackendSessionPool::forceResetSessions);
+ this.dbs.values().forEach(BackendSessionPool::forceResetSessions);
LOG.debug("Store truncated: {}", this.store);
} finally {
writeLock.unlock();
@@ -601,75 +600,81 @@ protected Session session(HugeType tableType) {
// Optimized disk
String disk = this.tableDiskMapping.get(tableType);
if (disk != null) {
- return db(disk).session();
+ return this.db(disk).session();
}
return this.sessions.session();
}
@Override
- public void writeSnapshot(String parentPath) {
- Lock writeLock = this.storeLock.writeLock();
- writeLock.lock();
+ public Set createSnapshot(String snapshotPrefix) {
+ Lock readLock = this.storeLock.readLock();
+ readLock.lock();
try {
+ Set uniqueParents = new HashSet<>();
// Every rocksdb instance should create an snapshot
- for (RocksDBSessions sessions : this.sessions()) {
- sessions.createSnapshot(parentPath);
+ for (Map.Entry entry : this.dbs.entrySet()) {
+ // Like: parent_path/rocksdb-data/*, * maybe g,m,s
+ Path originDataPath = Paths.get(entry.getKey()).toAbsolutePath();
+ Path parentParentPath = originDataPath.getParent().getParent();
+ // Like: rocksdb-data/*
+ Path pureDataPath = parentParentPath.relativize(originDataPath);
+ // Like: parent_path/snapshot_rocksdb-data/*
+ Path snapshotPath = parentParentPath.resolve(snapshotPrefix +
+ "_" + pureDataPath);
+ LOG.debug("The origin data path: {}", originDataPath);
+ LOG.debug("The snapshot data path: {}", snapshotPath);
+ RocksDBSessions sessions = entry.getValue();
+ sessions.createSnapshot(snapshotPath.toString());
+
+ uniqueParents.add(snapshotPath.getParent().toString());
}
+ LOG.info("The store '{}' create snapshot successfully", this);
+ return uniqueParents;
} finally {
- writeLock.unlock();
+ readLock.unlock();
}
}
@Override
- public void readSnapshot(String parentPath) {
- Lock writeLock = this.storeLock.writeLock();
- writeLock.lock();
+ public void resumeSnapshot(String snapshotPrefix, boolean deleteSnapshot) {
+ Lock readLock = this.storeLock.readLock();
+ readLock.lock();
try {
if (!this.opened()) {
return;
}
-
- File[] snapshotFiles = new File(parentPath).listFiles();
- E.checkNotNull(snapshotFiles, "snapshot files");
- List> fileRenamePairs = new ArrayList<>();
- for (File snapshotFile : snapshotFiles) {
- Session session = this.findMatchedSession(snapshotFile);
- File dataFile = new File(session.dataPath());
- fileRenamePairs.add(Pair.of(snapshotFile, dataFile));
- }
- /*
- * NOTE: must close rocksdb instance before deleting file directory,
- * if close after copying the snapshot directory to origin position,
- * it may produce dirty data.
- */
- for (RocksDBSessions sessions : this.sessions()) {
- sessions.forceCloseRocksDB();
+ Map snapshotPaths = new HashMap<>();
+ for (Map.Entry entry : this.dbs.entrySet()) {
+ RocksDBSessions sessions = entry.getValue();
+ String snapshotPath = sessions.buildSnapshotPath(snapshotPrefix);
+ LOG.debug("The origin data path: {}", entry.getKey());
+ if (!deleteSnapshot) {
+ snapshotPath = sessions.hardLinkSnapshot(snapshotPath);
+ }
+ LOG.debug("The snapshot data path: {}", snapshotPath);
+ snapshotPaths.put(snapshotPath, sessions);
}
- // Copy snapshot file to dest file
- for (Pair pair : fileRenamePairs) {
- File snapshotFile = pair.getLeft();
- File dataFile = pair.getRight();
- try {
- if (dataFile.exists()) {
- LOG.warn("Delete origin data directory {}", dataFile);
- FileUtils.deleteDirectory(dataFile);
+
+ for (Map.Entry entry :
+ snapshotPaths.entrySet()) {
+ String snapshotPath = entry.getKey();
+ RocksDBSessions sessions = entry.getValue();
+ sessions.resumeSnapshot(snapshotPath);
+
+ if (deleteSnapshot) {
+ // Delete empty snapshot parent directory
+ Path parentPath = Paths.get(snapshotPath).getParent();
+ if (Files.list(parentPath).count() == 0) {
+ FileUtils.deleteDirectory(parentPath.toFile());
}
- FileUtils.moveDirectory(snapshotFile, dataFile);
- } catch (IOException e) {
- throw new BackendException("Failed to move %s to %s",
- e, snapshotFile, dataFile);
}
}
- // Reload rocksdb instance
- for (RocksDBSessions sessions : this.sessions()) {
- sessions.reload();
- }
- LOG.info("The store {} load snapshot successfully", this.store);
- } catch (RocksDBException e) {
- throw new BackendException("Failed to reload rocksdb", e);
+ LOG.info("The store '{}' resume snapshot successfully", this);
+ } catch (RocksDBException | IOException e) {
+ throw new BackendException("Failed to resume snapshot", e);
} finally {
- writeLock.unlock();
+ readLock.unlock();
}
}
@@ -692,18 +697,6 @@ private final void closeSessions() {
}
}
- private Session findMatchedSession(File snapshotFile) {
- String fileName = snapshotFile.getName();
- for (Session session : this.session()) {
- String md5 = GZipUtil.md5(session.dataPath());
- if (fileName.equals(md5)) {
- return session;
- }
- }
- throw new BackendException("Can't find matched session for " +
- "snapshot file %s", snapshotFile);
- }
-
private final List session() {
this.checkOpened();
diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
index 1e5cce927e..b1b222cf9f 100644
--- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
+++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
@@ -134,7 +134,7 @@ public boolean existsTable(String table) {
@Override
public List property(String property) {
- throw new NotSupportException("RocksDBSstStore property()");
+ throw new UnsupportedOperationException("RocksDBSstStore property()");
}
@Override
@@ -149,8 +149,24 @@ public void createSnapshot(String snapshotPath) {
}
@Override
- public void reload() throws RocksDBException {
- throw new UnsupportedOperationException("reload");
+ public void resumeSnapshot(String snapshotPath) {
+ throw new UnsupportedOperationException("resumeSnapshot");
+ }
+
+ @Override
+ public String buildSnapshotPath(String snapshotPrefix) {
+ throw new UnsupportedOperationException("buildSnapshotPath");
+ }
+
+ @Override
+ public String hardLinkSnapshot(String snapshotPath)
+ throws RocksDBException {
+ throw new UnsupportedOperationException("hardLinkSnapshot");
+ }
+
+ @Override
+ public void reloadRocksDB() throws RocksDBException {
+ throw new UnsupportedOperationException("reloadRocksDB");
}
@Override
diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/GremlinApiTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/GremlinApiTest.java
index c2b6486b60..19ad475add 100644
--- a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/GremlinApiTest.java
+++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/GremlinApiTest.java
@@ -111,7 +111,8 @@ public void testClearAndInit() {
@Test
public void testTruncate() {
String body = "{"
- + "\"gremlin\":\"hugegraph.truncateBackend()\","
+ + "\"gremlin\":\"try {hugegraph.truncateBackend()} "
+ + "catch (UnsupportedOperationException e) {}\","
+ "\"bindings\":{},"
+ "\"language\":\"gremlin-groovy\","
+ "\"aliases\":{\"g\":\"__g_hugegraph\"}}";
diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java
index 7c070da341..b76ee9a763 100644
--- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java
+++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java
@@ -5489,7 +5489,11 @@ public void testRemoveEdgesOfSuperVertex() {
guido.remove();
// Clear all
- graph.truncateBackend();
+ try {
+ graph.truncateBackend();
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("Failed to truncate backend", e);
+ }
}
@Test