Skip to content

Commit

Permalink
prepare for mapdb backend (#357)
Browse files Browse the repository at this point in the history
prepare for mapdb backend and add some test cases

Change-Id: I046bde4a32b3fd6693f788d27a9165e2646bc5cb
  • Loading branch information
javeme authored and zhoney committed Mar 6, 2019
1 parent 01049ec commit b673240
Show file tree
Hide file tree
Showing 44 changed files with 722 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.baidu.hugegraph.backend.cache.Cache;
import com.baidu.hugegraph.backend.cache.CacheManager;
import com.baidu.hugegraph.backend.store.BackendStoreSystemInfo;
import com.baidu.hugegraph.backend.store.memory.InMemoryDBStoreProvider;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.ServerOptions;
import com.baidu.hugegraph.exception.NotSupportException;
Expand Down Expand Up @@ -189,7 +188,9 @@ private void loadGraph(String name, String path) {
private void checkBackendVersionOrExit() {
for (String graph : this.graphs()) {
HugeGraph hugegraph = this.graph(graph);
if (InMemoryDBStoreProvider.matchType(hugegraph.backend())) {
boolean persistence = hugegraph.graphTransaction().store()
.features().supportsPersistence();
if (!persistence) {
hugegraph.initBackend();
}
BackendStoreSystemInfo info = new BackendStoreSystemInfo(hugegraph);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ public final synchronized Cluster cluster() {
}

@Override
public final synchronized Session session() {
public final Session session() {
return (Session) super.getOrNewSession();
}

@Override
protected final synchronized Session newSession() {
protected Session newSession() {
E.checkState(this.cluster != null,
"Cassandra cluster has not been initialized");
return new Session();
Expand Down Expand Up @@ -157,7 +157,7 @@ public BatchStatement add(Statement statement) {
}

@Override
public void clear() {
public void rollback() {
this.batch.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public void rollbackTx() {

session.txState(TxState.ROLLBACKING);
try {
session.clear();
session.rollback();
} finally {
// Assume batch commit would auto rollback
session.txState(TxState.CLEAN);
Expand Down
2 changes: 1 addition & 1 deletion hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
<version>1.5.6</version>
<version>1.5.8</version>
</dependency>

<!-- tinkerpop -->
Expand Down
12 changes: 8 additions & 4 deletions hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,6 @@ public Iterator<Vertex> vertices(Query query) {
return this.graphTransaction().queryVertices(query);
}

public Iterator<Vertex> adjacentVertices(Iterator<Edge> edges) {
return this.graphTransaction().queryAdjacentVertices(edges);
}

@Override
public Iterator<Edge> edges(Object... objects) {
if (objects.length == 0) {
Expand All @@ -401,6 +397,14 @@ public Iterator<Edge> edges(Query query) {
return this.graphTransaction().queryEdges(query);
}

public Iterator<Vertex> adjacentVertices(Iterator<Edge> edges) {
return this.graphTransaction().queryAdjacentVertices(edges);
}

public Iterator<Edge> adjacentEdges(Id vertexId) {
return this.graphTransaction().queryEdgesByVertex(vertexId);
}

public PropertyKey propertyKey(Id id) {
PropertyKey pk = this.schemaTransaction().getPropertyKey(id);
E.checkArgument(pk != null, "Undefined property key id: '%s'", id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.baidu.hugegraph.type.HugeType;
import com.google.common.collect.ImmutableList;

public class CachedGraphTransaction extends GraphTransaction {
public final class CachedGraphTransaction extends GraphTransaction {

private final static int MAX_CACHE_EDGES_PER_QUERY = 100;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.id.Id;
Expand All @@ -40,7 +39,7 @@
import com.baidu.hugegraph.util.Events;
import com.google.common.collect.ImmutableSet;

public class CachedSchemaTransaction extends SchemaTransaction {
public final class CachedSchemaTransaction extends SchemaTransaction {

private final Cache idCache;
private final Cache nameCache;
Expand Down Expand Up @@ -153,12 +152,26 @@ private static Id generateId(HugeType type, String name) {
return IdGenerator.of(type.string() + "-" + name);
}

private Object getOrFetch(HugeType type, Id id,
Function<Id, Object> fetcher) {
@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);

this.resetCachedAllIfReachedCapacity();

Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);

Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
}

@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
Id prefixedId = generateId(type, id);
Object value = this.idCache.get(prefixedId);
if (value == null) {
value = fetcher.apply(id);
value = super.getSchema(type, id);
if (value != null) {
this.resetCachedAllIfReachedCapacity();

Expand All @@ -169,15 +182,17 @@ private Object getOrFetch(HugeType type, Id id,
this.nameCache.update(prefixedName, schema);
}
}
return value;
return (T) value;
}

private Object getOrFetch(HugeType type, String name,
Function<String, Object> fetcher) {
@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type,
String name) {
Id prefixedName = generateId(type, name);
Object value = this.nameCache.get(prefixedName);
if (value == null) {
value = fetcher.apply(name);
value = super.getSchema(type, name);
if (value != null) {
this.resetCachedAllIfReachedCapacity();

Expand All @@ -188,36 +203,6 @@ private Object getOrFetch(HugeType type, String name,
this.idCache.update(prefixedId, schema);
}
}
return value;
}

@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);

this.resetCachedAllIfReachedCapacity();

Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);

Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
}

@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
Object value = this.getOrFetch(type, id,
k -> super.getSchema(type, id));
return (T) value;
}

@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type,
String name) {
Object value = this.getOrFetch(type, name,
k -> super.getSchema(type, name));
return (T) value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class RamCache implements Cache {

// NOTE: the count in number of items, not in bytes
private final int capacity;
private final int halfCapacity;

// Implement LRU cache
private final ConcurrentMap<Id, LinkNode<Id, Object>> map;
Expand All @@ -63,14 +64,14 @@ public RamCache() {
}

public RamCache(int capacity) {
this.keyLock = new KeyLock();

if (capacity < 1) {
capacity = 1;
if (capacity < 0) {
capacity = 0;
}
this.keyLock = new KeyLock();
this.capacity = capacity;
this.halfCapacity = this.capacity >> 1;

int initialCapacity = capacity >> 3;
int initialCapacity = capacity >= MB ? capacity >> 10 : 256;
if (initialCapacity > MAX_INIT_CAP) {
initialCapacity = MAX_INIT_CAP;
}
Expand All @@ -80,9 +81,18 @@ public RamCache(int capacity) {
}

@Watched(prefix = "ramcache")
private Object access(Id id) {
private final Object access(Id id) {
assert id != null;

if (this.map.size() <= this.halfCapacity) {
LinkNode<Id, Object> node = this.map.get(id);
if (node == null) {
return null;
}
assert id.equals(node.key());
return node.value();
}

final Lock lock = this.keyLock.lock(id);
try {
LinkNode<Id, Object> node = this.map.get(id);
Expand All @@ -91,7 +101,7 @@ private Object access(Id id) {
}

// NOTE: update the queue only if the size > capacity/2
if (this.map.size() > this.capacity >> 1) {
if (this.map.size() > this.halfCapacity) {
// Move the node from mid to tail
if (this.queue.remove(node) == null) {
// The node may be removed by others through dequeue()
Expand All @@ -100,13 +110,6 @@ private Object access(Id id) {
this.queue.enqueue(node);
}

// Ignore concurrent write for hits
++this.hits;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache cached '{}' (hits={}, miss={})",
id, this.hits, this.miss);
}

assert id.equals(node.key());
return node.value();
} finally {
Expand All @@ -115,7 +118,7 @@ private Object access(Id id) {
}

@Watched(prefix = "ramcache")
private void write(Id id, Object value) {
private final void write(Id id, Object value) {
assert id != null;
assert this.capacity > 0;

Expand Down Expand Up @@ -163,14 +166,13 @@ private void write(Id id, Object value) {

// Add the new item to tail of the queue, then map it
this.map.put(id, this.queue.enqueue(id, value));

} finally {
lock.unlock();
}
}

@Watched(prefix = "ramcache")
private void remove(Id id) {
private final void remove(Id id) {
assert id != null;

final Lock lock = this.keyLock.lock(id);
Expand All @@ -192,16 +194,23 @@ private void remove(Id id) {
@Override
public Object get(Id id) {
Object value = null;
if (this.map.containsKey(id)) {
if (this.map.size() <= this.halfCapacity || this.map.containsKey(id)) {
// Maybe the id removed by other threads and returned null value
value = this.access(id);
}

if (value == null) {
++this.miss;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache missed '{}' (miss={}, hits={})",
id, this.miss, this.hits);
}
} else {
++this.hits;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache cached '{}' (hits={}, miss={})",
id, this.hits, this.miss);
}
}
return value;
}
Expand All @@ -210,10 +219,11 @@ public Object get(Id id) {
@Override
public Object getOrFetch(Id id, Function<Id, Object> fetcher) {
Object value = null;
if (this.map.containsKey(id)) {
if (this.map.size() <= this.halfCapacity || this.map.containsKey(id)) {
// Maybe the id removed by other threads and returned null value
value = this.access(id);
}

if (value == null) {
++this.miss;
if (LOG.isDebugEnabled()) {
Expand All @@ -223,6 +233,12 @@ public Object getOrFetch(Id id, Function<Id, Object> fetcher) {
// Do fetch and update the cache
value = fetcher.apply(id);
this.update(id, value);
} else {
++this.hits;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache cached '{}' (hits={}, miss={})",
id, this.hits, this.miss);
}
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public abstract class IdGenerator {

public abstract Id generate(HugeVertex vertex);

public static Id of(String id) {
public final static Id of(String id) {
return new StringId(id);
}

public static Id of(long id) {
public final static Id of(long id) {
return new LongId(id);
}

public static Id of(byte[] bytes, boolean number) {
public final static Id of(byte[] bytes, boolean number) {
return number ? new LongId(bytes) : new StringId(bytes);
}

Expand All @@ -45,7 +45,7 @@ public static Id of(byte[] bytes, boolean number) {
* @param id original string id value
* @return wrapped id object
*/
public Id generate(String id) {
public final Id generate(String id) {
return of(id);
}

Expand All @@ -54,7 +54,7 @@ public Id generate(String id) {
* @param id original long id value
* @return wrapped id object
*/
public Id generate(long id) {
public final Id generate(long id) {
return of(id);
}

Expand Down
Loading

0 comments on commit b673240

Please sign in to comment.