Skip to content

Commit

Permalink
Support pagination when single index query
Browse files Browse the repository at this point in the history
Change-Id: I15c10c702078ff4a998378633e5cbb68cf2ac5fa
  • Loading branch information
Linary committed Jan 14, 2019
1 parent e84ce7a commit 6a800ec
Show file tree
Hide file tree
Showing 25 changed files with 895 additions and 299 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@
import com.codahale.metrics.Histogram;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonProperty;

import jersey.repackaged.com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap;

@Path("graphs/{graph}/jobs/gremlin")
@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ public int optimized() {
return this.optimizedType;
}


public void registerResultsFilter(Function<HugeElement, Boolean> filter) {
this.resultsFilter = filter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public IdPrefixQuery(Query originQuery, Id prefix) {
this(originQuery.resultType(), originQuery, prefix, true, prefix);
}

public IdPrefixQuery(Query originQuery, Id start, Id prefix) {
this(originQuery.resultType(), originQuery, start, true, prefix);
}

public IdPrefixQuery(Query originQuery,
Id start, boolean inclusive, Id prefix) {
this(originQuery.resultType(), originQuery, start, inclusive, prefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public class Query implements Cloneable {
public static final long NO_LIMIT = Long.MAX_VALUE;

public static final long NO_CAPACITY = -1L;
public static final long DEFAULT_CAPACITY = 800000L; // HugeGraph-777
// TODO: Still using 80w or changing to a smaller value
public static final long DEFAULT_CAPACITY = 1000L; // HugeGraph-777

private HugeType resultType;
private Map<HugeKeys, Order> orders;
Expand Down Expand Up @@ -265,8 +266,9 @@ public int hashCode() {

@Override
public String toString() {
return String.format("Query for %s offset=%d, limit=%d, order by %s",
return String.format("Query for %s page=%s, offset=%d, limit=%d, order by %s",
this.resultType,
this.page,
this.offset,
this.limit,
this.orders.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@

package com.baidu.hugegraph.backend.serializer;

import java.util.Base64;
import java.util.function.BiFunction;

import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendEntry.BackendIterator;
import com.baidu.hugegraph.backend.store.BackendEntryIterator;
import com.baidu.hugegraph.util.Bytes;
import com.baidu.hugegraph.util.E;

public class BinaryEntryIterator<Elem> extends BackendEntryIterator {
Expand Down Expand Up @@ -140,61 +137,4 @@ private void skipPageOffset(String page) {
this.skip(this.current, pagestate.offset());
}
}

public static class PageState {

private final byte[] position;
private final int offset;

public PageState(byte[] position, int offset) {
E.checkNotNull(position, "position");
this.position = position;
this.offset = offset;
}

public byte[] position() {
return this.position;
}

public int offset() {
return this.offset;
}

@Override
public String toString() {
return Base64.getEncoder().encodeToString(this.toBytes());
}

public byte[] toBytes() {
int length = 2 + this.position.length + BytesBuffer.INT_LEN;
BytesBuffer buffer = BytesBuffer.allocate(length);
buffer.writeBytes(this.position);
buffer.writeInt(this.offset);
return buffer.bytes();
}

public static PageState fromString(String page) {
byte[] bytes;
try {
bytes = Base64.getDecoder().decode(page);
} catch (Exception e) {
throw new BackendException("Invalid page: '%s'", e, page);
}
return fromBytes(bytes);
}

public static PageState fromBytes(byte[] bytes) {
if (bytes.length == 0) {
// The first page
return new PageState(new byte[0], 0);
}
try {
BytesBuffer buffer = BytesBuffer.wrap(bytes);
return new PageState(buffer.readBytes(), buffer.readInt());
} catch (Exception e) {
throw new BackendException("Invalid page: '0x%s'",
e, Bytes.toHex(bytes));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.baidu.hugegraph.backend.query.Condition.Relation;
import com.baidu.hugegraph.backend.query.ConditionQuery;
import com.baidu.hugegraph.backend.query.IdPrefixQuery;
import com.baidu.hugegraph.backend.query.IdQuery;
import com.baidu.hugegraph.backend.query.IdRangeQuery;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.serializer.BinaryBackendEntry.BinaryId;
Expand Down Expand Up @@ -104,7 +103,6 @@ private BinaryBackendEntry newBackendEntry(HugeEdge edge) {
return new BinaryBackendEntry(edge.type(), id);
}

@SuppressWarnings("unused")
private BinaryBackendEntry newBackendEntry(SchemaElement elem) {
return newBackendEntry(elem.type(), elem.id());
}
Expand Down Expand Up @@ -633,11 +631,26 @@ private Query writeStringIndexQuery(ConditionQuery query) {
E.checkArgument(index != null, "Please specify the index label");
E.checkArgument(key != null, "Please specify the index key");

Id id = formatIndexId(query.resultType(), index, key);
IdQuery idQuery = new IdQuery(query, id);
idQuery.limit(query.limit());
idQuery.offset(query.offset());
return idQuery;
Id prefix = formatIndexId(query.resultType(), index, key);

Query newQuery;
/*
* If used paging and the page number is not empty, deserialize
* the page to id and use it as the starting row for this query
*/
if (query.paging() && !query.page().isEmpty()) {
byte[] position = PageState.fromString(query.page()).position();
BinaryId start = new BinaryId(position, null);
newQuery = new IdPrefixQuery(query, start, prefix);
} else {
newQuery = new IdPrefixQuery(query, prefix);
}
if (query.paging()) {
newQuery.page(query.page());
}
newQuery.limit(query.limit());
newQuery.offset(query.offset());
return newQuery;
}

private Query writeRangeIndexQuery(ConditionQuery query) {
Expand Down Expand Up @@ -680,10 +693,11 @@ private Query writeRangeIndexQuery(ConditionQuery query) {
HugeType type = query.resultType();
if (keyEq != null) {
Id id = formatIndexId(type, index, keyEq);
IdQuery idQuery = new IdQuery(query, id);
idQuery.limit(query.limit());
idQuery.offset(query.offset());
return idQuery;
Query newQuery = new IdPrefixQuery(query, id);
newQuery.page(query.page());
newQuery.limit(query.limit());
newQuery.offset(query.offset());
return newQuery;
}

if (keyMin == null) {
Expand All @@ -704,15 +718,28 @@ private Query writeRangeIndexQuery(ConditionQuery query) {
keyMinEq = true;
}

Id start = min;
if (query.paging() && !query.page().isEmpty()) {
byte[] position = PageState.fromString(query.page()).position();
start = new BinaryId(position, null);
}

Query newQuery;
if (keyMax == null) {
Id prefix = formatIndexId(type, index, null);
// Reset the first byte to make same length-prefix
prefix.asBytes()[0] = min.asBytes()[0];
return new IdPrefixQuery(query, min, keyMinEq, prefix);
newQuery = new IdPrefixQuery(query, start, keyMinEq, prefix);
} else {
Id max = formatIndexId(type, index, keyMax);
return new IdRangeQuery(query, min, keyMinEq, max, keyMaxEq);
newQuery = new IdRangeQuery(query, start, keyMinEq, max, keyMaxEq);
}
if (query.paging()) {
newQuery.page(query.page());
}
newQuery.limit(query.limit());
newQuery.offset(query.offset());
return newQuery;
}

private BinaryBackendEntry formatILDeletion(HugeIndex index) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.backend.serializer;

import java.util.Base64;

import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.util.Bytes;
import com.baidu.hugegraph.util.E;

public class PageState {

private final byte[] position;
private final int offset;

public PageState(byte[] position, int offset) {
E.checkNotNull(position, "position");
this.position = position;
this.offset = offset;
}

public byte[] position() {
return this.position;
}

public int offset() {
return this.offset;
}

@Override
public String toString() {
return Base64.getEncoder().encodeToString(this.toBytes());
}

public byte[] toBytes() {
int length = 2 + this.position.length + BytesBuffer.INT_LEN;
BytesBuffer buffer = BytesBuffer.allocate(length);
buffer.writeBytes(this.position);
buffer.writeInt(this.offset);
return buffer.bytes();
}

public static PageState fromString(String page) {
byte[] bytes;
try {
bytes = Base64.getDecoder().decode(page);
} catch (Exception e) {
throw new BackendException("Invalid page: '%s'", e, page);
}
return fromBytes(bytes);
}

public static PageState fromBytes(byte[] bytes) {
if (bytes.length == 0) {
// The first page
return new PageState(new byte[0], 0);
}
try {
BytesBuffer buffer = BytesBuffer.wrap(bytes);
return new PageState(buffer.readBytes(), buffer.readInt());
} catch (Exception e) {
throw new BackendException("Invalid page: '0x%s'",
e, Bytes.toHex(bytes));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ protected final long fetched() {
}

protected final void checkCapacity() throws LimitExceedException {
// Stop if reach capacity
this.query.checkCapacity(this.count);
// TODO: delete if
if (this.query.resultType().isGraph()) {
// Stop if reach capacity
this.query.checkCapacity(this.count);
}
}

protected final boolean exceedLimit() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.backend.tx;

import java.util.Collection;
import java.util.Set;

import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.util.E;

public class EntireIds implements Ids {

private final Set<Id> ids;

public EntireIds(Collection<Id> ids) {
E.checkArgument(ids instanceof Set,
"The ids of EntireIds must be Set, but got '%s'",
ids.getClass().getName());
this.ids = (Set<Id>) ids;
}

public EntireIds(Set<Id> ids) {
this.ids = ids;
}

public Set<Id> ids() {
return this.ids;
}
}
Loading

0 comments on commit 6a800ec

Please sign in to comment.