Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize hbase with multi-get for g.V(ids) #279

Merged
merged 1 commit into from
Dec 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Future;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -51,6 +52,10 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter.RowRange;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;

Expand Down Expand Up @@ -383,6 +388,29 @@ public RowIterator get(String table, byte[] family, byte[] rowkey) {
}
}

/**
* Get multi records by rowkeys from a table
*/
public RowIterator get(String table, byte[] family,
Set<byte[]> rowkeys) {
assert !this.hasChanges();

List<Get> gets = new ArrayList<>(rowkeys.size());
for (byte[] rowkey : rowkeys) {
Get get = new Get(rowkey);
if (family != null) {
get.addFamily(family);
}
gets.add(get);
}

try (Table htable = table(table)) {
return new RowIterator(htable.get(gets));
} catch (IOException e) {
throw new BackendException(e);
}
}

/**
* Scan all records from a table
*/
Expand All @@ -403,6 +431,27 @@ public RowIterator scan(String table, byte[] prefix) {
return this.scan(table, prefix, true, prefix);
}

/**
* Scan records by multi rowkey prefixs from a table
*/
public RowIterator scan(String table, Set<byte[]> prefixs) {
assert !this.hasChanges();

FilterList orFilters = new FilterList(Operator.MUST_PASS_ONE);
for (byte[] prefix : prefixs) {
FilterList andFilters = new FilterList(Operator.MUST_PASS_ALL);
List<RowRange> ranges = new ArrayList<>();
ranges.add(new RowRange(prefix, true, null, true));
andFilters.addFilter(new MultiRowRangeFilter(ranges));
andFilters.addFilter(new PrefixFilter(prefix));

orFilters.addFilter(andFilters);
}

Scan scan = new Scan().setFilter(orFilters);
return this.scan(table, scan);
}

/**
* Scan records by rowkey start and prefix from a table
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
Expand All @@ -45,7 +47,6 @@
import com.baidu.hugegraph.backend.store.hbase.HbaseSessions.RowIterator;
import com.baidu.hugegraph.backend.store.hbase.HbaseSessions.Session;
import com.baidu.hugegraph.exception.NotSupportException;
import com.baidu.hugegraph.iterator.ExtendableIterator;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.Bytes;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -133,11 +134,14 @@ public Iterator<BackendEntry> query(Session session, Query query) {
// Query by id
if (query.conditions().isEmpty()) {
assert !query.ids().isEmpty();
ExtendableIterator<BackendEntry> rs = new ExtendableIterator<>();
for (Id id : query.ids()) {
rs.extend(newEntryIterator(this.queryById(session, id), query));
RowIterator rowIterator = null;
if (query.ids().size() == 1) {
Id id = query.ids().iterator().next();
rowIterator = this.queryById(session, id);
} else {
rowIterator = this.queryByIds(session, query.ids());
}
return rs;
return newEntryIterator(rowIterator, query);
}

// Query by condition (or condition + id)
Expand All @@ -159,6 +163,12 @@ protected RowIterator queryById(Session session, Id id) {
return session.get(this.table(), null, id.asBytes());
}

protected RowIterator queryByIds(Session session, Set<Id> ids) {
Set<byte[]> rowkeys = ids.stream().map(Id::asBytes)
.collect(Collectors.toSet());
return session.get(this.table(), null, rowkeys);
}

protected RowIterator queryByCond(Session session, ConditionQuery query) {
if (query.containsScanCondition()) {
E.checkArgument(query.relations().size() == 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
Expand Down Expand Up @@ -144,6 +146,13 @@ protected RowIterator queryById(Session session, Id id) {
return session.scan(this.table(), prefix);
}

@Override
protected RowIterator queryByIds(Session session, Set<Id> ids) {
Set<byte[]> prefixs = ids.stream().map(Id::asBytes)
.collect(Collectors.toSet());
return session.scan(this.table(), prefixs);
}

@Override
protected void parseRowColumns(Result row, BackendEntry entry,
Query query) throws IOException {
Expand Down