diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java index bcd2f969b8..a3407cb914 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseSessions.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; @@ -51,6 +52,10 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; +import org.apache.hadoop.hbase.filter.MultiRowRangeFilter.RowRange; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.PrefixFilter; @@ -383,6 +388,29 @@ public RowIterator get(String table, byte[] family, byte[] rowkey) { } } + /** + * Get multi records by rowkeys from a table + */ + public RowIterator get(String table, byte[] family, + Set rowkeys) { + assert !this.hasChanges(); + + List gets = new ArrayList<>(rowkeys.size()); + for (byte[] rowkey : rowkeys) { + Get get = new Get(rowkey); + if (family != null) { + get.addFamily(family); + } + gets.add(get); + } + + try (Table htable = table(table)) { + return new RowIterator(htable.get(gets)); + } catch (IOException e) { + throw new BackendException(e); + } + } + /** * Scan all records from a table */ @@ -403,6 +431,27 @@ public RowIterator scan(String table, byte[] prefix) { return this.scan(table, prefix, true, prefix); } + /** + * Scan records by multi rowkey prefixs from a table + */ + public RowIterator scan(String table, Set prefixs) { + assert !this.hasChanges(); + + FilterList orFilters = new FilterList(Operator.MUST_PASS_ONE); + for (byte[] prefix : prefixs) { + FilterList andFilters = new FilterList(Operator.MUST_PASS_ALL); + List ranges = new ArrayList<>(); + ranges.add(new RowRange(prefix, true, null, true)); + andFilters.addFilter(new MultiRowRangeFilter(ranges)); + andFilters.addFilter(new PrefixFilter(prefix)); + + orFilters.addFilter(andFilters); + } + + Scan scan = new Scan().setFilter(orFilters); + return this.scan(table, scan); + } + /** * Scan records by rowkey start and prefix from a table */ diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java index d2dc1f5c72..6a62219634 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -45,7 +47,6 @@ import com.baidu.hugegraph.backend.store.hbase.HbaseSessions.RowIterator; import com.baidu.hugegraph.backend.store.hbase.HbaseSessions.Session; import com.baidu.hugegraph.exception.NotSupportException; -import com.baidu.hugegraph.iterator.ExtendableIterator; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.util.Bytes; import com.baidu.hugegraph.util.E; @@ -133,11 +134,14 @@ public Iterator query(Session session, Query query) { // Query by id if (query.conditions().isEmpty()) { assert !query.ids().isEmpty(); - ExtendableIterator rs = new ExtendableIterator<>(); - for (Id id : query.ids()) { - rs.extend(newEntryIterator(this.queryById(session, id), query)); + RowIterator rowIterator = null; + if (query.ids().size() == 1) { + Id id = query.ids().iterator().next(); + rowIterator = this.queryById(session, id); + } else { + rowIterator = this.queryByIds(session, query.ids()); } - return rs; + return newEntryIterator(rowIterator, query); } // Query by condition (or condition + id) @@ -159,6 +163,12 @@ protected RowIterator queryById(Session session, Id id) { return session.get(this.table(), null, id.asBytes()); } + protected RowIterator queryByIds(Session session, Set ids) { + Set rowkeys = ids.stream().map(Id::asBytes) + .collect(Collectors.toSet()); + return session.get(this.table(), null, rowkeys); + } + protected RowIterator queryByCond(Session session, ConditionQuery query) { if (query.containsScanCondition()) { E.checkArgument(query.relations().size() == 1, diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTables.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTables.java index 06fbc916a2..7178aec355 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTables.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTables.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -144,6 +146,13 @@ protected RowIterator queryById(Session session, Id id) { return session.scan(this.table(), prefix); } + @Override + protected RowIterator queryByIds(Session session, Set ids) { + Set prefixs = ids.stream().map(Id::asBytes) + .collect(Collectors.toSet()); + return session.scan(this.table(), prefixs); + } + @Override protected void parseRowColumns(Result row, BackendEntry entry, Query query) throws IOException {