Skip to content

Commit

Permalink
Optimize hbase with multi-get for g.V(ids)
Browse files Browse the repository at this point in the history
Implement #276

Change-Id: Ia876be9129b42532b925a9ae0edab3355a065549
  • Loading branch information
Linary committed Dec 18, 2018
1 parent f2c99cf commit c2b0dee
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Future;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -51,6 +52,10 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter.RowRange;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;

Expand Down Expand Up @@ -383,6 +388,29 @@ public RowIterator get(String table, byte[] family, byte[] rowkey) {
}
}

/**
* Get multi records by rowkeys from a table
*/
public RowIterator get(String table, byte[] family,
Set<byte[]> rowkeys) {
assert !this.hasChanges();

List<Get> gets = new ArrayList<>(rowkeys.size());
for (byte[] rowkey : rowkeys) {
Get get = new Get(rowkey);
if (family != null) {
get.addFamily(family);
}
gets.add(get);
}

try (Table htable = table(table)) {
return new RowIterator(htable.get(gets));
} catch (IOException e) {
throw new BackendException(e);
}
}

/**
* Scan all records from a table
*/
Expand All @@ -403,6 +431,27 @@ public RowIterator scan(String table, byte[] prefix) {
return this.scan(table, prefix, true, prefix);
}

/**
* Scan records by multi rowkey prefixs from a table
*/
public RowIterator scan(String table, Set<byte[]> prefixs) {
assert !this.hasChanges();

FilterList orFilters = new FilterList(Operator.MUST_PASS_ONE);
for (byte[] prefix : prefixs) {
FilterList andFilters = new FilterList(Operator.MUST_PASS_ALL);
List<RowRange> ranges = new ArrayList<>();
ranges.add(new RowRange(prefix, true, null, true));
andFilters.addFilter(new MultiRowRangeFilter(ranges));
andFilters.addFilter(new PrefixFilter(prefix));

orFilters.addFilter(andFilters);
}

Scan scan = new Scan().setFilter(orFilters);
return this.scan(table, scan);
}

/**
* Scan records by rowkey start and prefix from a table
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
Expand All @@ -45,7 +47,6 @@
import com.baidu.hugegraph.backend.store.hbase.HbaseSessions.RowIterator;
import com.baidu.hugegraph.backend.store.hbase.HbaseSessions.Session;
import com.baidu.hugegraph.exception.NotSupportException;
import com.baidu.hugegraph.iterator.ExtendableIterator;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.Bytes;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -133,11 +134,14 @@ public Iterator<BackendEntry> query(Session session, Query query) {
// Query by id
if (query.conditions().isEmpty()) {
assert !query.ids().isEmpty();
ExtendableIterator<BackendEntry> rs = new ExtendableIterator<>();
for (Id id : query.ids()) {
rs.extend(newEntryIterator(this.queryById(session, id), query));
RowIterator rowIterator = null;
if (query.ids().size() == 1) {
Id id = query.ids().iterator().next();
rowIterator = this.queryById(session, id);
} else {
rowIterator = this.queryByIds(session, query.ids());
}
return rs;
return newEntryIterator(rowIterator, query);
}

// Query by condition (or condition + id)
Expand All @@ -159,6 +163,12 @@ protected RowIterator queryById(Session session, Id id) {
return session.get(this.table(), null, id.asBytes());
}

protected RowIterator queryByIds(Session session, Set<Id> ids) {
Set<byte[]> rowkeys = ids.stream().map(Id::asBytes)
.collect(Collectors.toSet());
return session.get(this.table(), null, rowkeys);
}

protected RowIterator queryByCond(Session session, ConditionQuery query) {
if (query.containsScanCondition()) {
E.checkArgument(query.relations().size() == 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
Expand Down Expand Up @@ -144,6 +146,13 @@ protected RowIterator queryById(Session session, Id id) {
return session.scan(this.table(), prefix);
}

@Override
protected RowIterator queryByIds(Session session, Set<Id> ids) {
Set<byte[]> prefixs = ids.stream().map(Id::asBytes)
.collect(Collectors.toSet());
return session.scan(this.table(), prefixs);
}

@Override
protected void parseRowColumns(Result row, BackendEntry entry,
Query query) throws IOException {
Expand Down

0 comments on commit c2b0dee

Please sign in to comment.