Skip to content
This repository has been archived by the owner on Mar 7, 2024. It is now read-only.

Commit

Permalink
fix count query bug and disable TileDB's buggy query condition
Browse files Browse the repository at this point in the history
  • Loading branch information
DimitrisStaratzis committed Jul 14, 2022
1 parent e565bda commit 058a5f6
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 91 deletions.
66 changes: 34 additions & 32 deletions src/main/java/com/facebook/presto/plugin/tiledb/TileDBMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
Expand Down Expand Up @@ -166,41 +167,42 @@ public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session

// Predicates are fetched as summary of constraints
TupleDomain<ColumnHandle> effectivePredicate = constraint.getSummary();
Set<ColumnHandle> columnHandles = new HashSet<>();
for (ColumnHandle e : columns.values()) {
if (hasPredicate) {
columnsWithPredicates.add(e);
}
//columns that are not included in the columnHandles are filtered by presto, not tileDB. Strings and queries with 'OR' are not pushed down for now.
if ((((effectivePredicate.getDomains().get().get(e) != null) &&
(effectivePredicate.getDomains().get().get(e).getValues().getRanges().getOrderedRanges().size() > 1)))) { //having more than one range effectively means it is an OR condition, which is not yet supported by the core library.
LOG.info("Column %s has an OR condition which is not yet supported by TileDB's native QueryCondition. The filtering will happen by Presto.", ((TileDBColumnHandle) e).getColumnName());
}
else if (columnsWithPredicates.contains(e)) { //Predicates are not supported by the QueryCondition, thus we need to leave this to Presto.
LOG.info("Column %s uses a Predicate which is not yet supported by TileDB's native QueryCondition. The filtering will happen by Presto.", ((TileDBColumnHandle) e).getColumnName());
}
else {
columnHandles.add(e);
}
}

// Uncomment to include all columns
// columns that are not included in the columnHandles are filtered by presto, not tileDB
// Set<ColumnHandle> columnHandles = new HashSet<>(columns.values());
// Set<ColumnHandle> columnHandles = new HashSet<>(); //TODO check if possible in the future
// for (ColumnHandle e : columns.values()) {
// if (hasPredicate) {
// columnsWithPredicates.add(e);
// }
// //columns that are not included in the columnHandles are filtered by presto, not tileDB. Strings and queries with 'OR' are not pushed down for now.
// if ((((effectivePredicate.getDomains().get().get(e) != null) &&
// (effectivePredicate.getDomains().get().get(e).getValues().getRanges().getOrderedRanges().size() > 1)))) { //having more than one range effectively means it is an OR condition, which is not yet supported by the core library.
// LOG.info("Column %s has an OR condition which is not yet supported by TileDB's native QueryCondition. The filtering will happen by Presto.", ((TileDBColumnHandle) e).getColumnName());
// }
// else if (columnsWithPredicates.contains(e)) { //Predicates are not supported by the QueryCondition, thus we need to leave this to Presto.
// LOG.info("Column %s uses a Predicate which is not yet supported by TileDB's native QueryCondition. The filtering will happen by Presto.", ((TileDBColumnHandle) e).getColumnName());
// }
// else {
// columnHandles.add(e);
// }
// }

Set<ColumnHandle> dimensionHandles = columns.values().stream()
.filter(e -> ((TileDBColumnHandle) e).getIsDimension())
.collect(Collectors.toSet());

List<ColumnHandle> columnsInLayout;
if (desiredColumns.isPresent()) {
// Add all dimensions since dimensions will always be returned by tiledb
Set<ColumnHandle> desiredColumnsWithDimension = new HashSet<>(desiredColumns.get());
desiredColumnsWithDimension.addAll(columnHandles);
desiredColumnsWithDimension.addAll(dimensionHandles);
// desiredColumnsWithDimension.addAll(columnHandles);
columnsInLayout = new ArrayList<>(desiredColumnsWithDimension);
}
else {
columnsInLayout = new ArrayList<>(columns.values());
}

// The only enforceable constraints are ones for dimension columns
Map<ColumnHandle, Domain> enforceableDomains = new HashMap<>(Maps.filterKeys(effectivePredicate.getDomains().get(), Predicates.in(columnHandles)));
Map<ColumnHandle, Domain> enforceableDimensionDomains = new HashMap<>(Maps.filterKeys(effectivePredicate.getDomains().get(), Predicates.in(dimensionHandles)));

if (!getSplitOnlyPredicates(session)) {
try {
Expand All @@ -216,9 +218,9 @@ else if (columnsWithPredicates.contains(e)) { //Predicates are not supported by
HashMap<String, Pair> nonEmptyDomain = array.nonEmptyDomain();
// Find any dimension which do not have predicates and add one for the entire domain.
// This is required so we can later split on the predicates
for (ColumnHandle handle : columnHandles) {
if (!enforceableDomains.containsKey(handle)) {
TileDBColumnHandle columnHandle = ((TileDBColumnHandle) handle);
for (ColumnHandle dimensionHandle : dimensionHandles) {
if (!enforceableDimensionDomains.containsKey(dimensionHandle)) {
TileDBColumnHandle columnHandle = ((TileDBColumnHandle) dimensionHandle);
if (nonEmptyDomain.containsKey(columnHandle.getColumnName())) {
Pair<Object, Object> domain = nonEmptyDomain.get(columnHandle.getColumnName());
Object nonEmptyMin = domain.getFirst();
Expand All @@ -233,7 +235,7 @@ else if (columnsWithPredicates.contains(e)) { //Predicates are not supported by
range = Range.range(type, ((Integer) floatToRawIntBits((Float) nonEmptyMin)).longValue(), true,
((Integer) floatToRawIntBits((Float) nonEmptyMax)).longValue(), true);
}
else if (isVarcharType(type)) {
else if (type instanceof VarcharType) {
range = Range.range(type, utf8Slice(nonEmptyMin.toString()), true,
utf8Slice(nonEmptyMax.toString()), true);
}
Expand All @@ -243,8 +245,8 @@ else if (isVarcharType(type)) {
ConvertUtils.convert(nonEmptyMax, type.getJavaType()), true);
}

enforceableDomains.put(
handle,
enforceableDimensionDomains.put(
dimensionHandle,
Domain.create(ValueSet.ofRanges(range), false));
}
}
Expand All @@ -256,14 +258,14 @@ else if (isVarcharType(type)) {
}
}

TupleDomain<ColumnHandle> enforceableTupleDomain = TupleDomain.withColumnDomains(enforceableDomains);
TupleDomain<ColumnHandle> enforceableTupleDomain = TupleDomain.withColumnDomains(enforceableDimensionDomains);
TupleDomain<ColumnHandle> remainingTupleDomain;

// The remaining tuples non-enforced by TileDB are attributes
remainingTupleDomain = TupleDomain.withColumnDomains(Maps.filterKeys(effectivePredicate.getDomains().get(), Predicates.not(Predicates.in(columnHandles))));
remainingTupleDomain = TupleDomain.withColumnDomains(Maps.filterKeys(effectivePredicate.getDomains().get(), Predicates.not(Predicates.in(dimensionHandles))));

ConnectorTableLayout layout = new ConnectorTableLayout(
new TileDBTableLayoutHandle(tableHandle, enforceableTupleDomain, columnHandles),
new TileDBTableLayoutHandle(tableHandle, enforceableTupleDomain, dimensionHandles),
Optional.of(columnsInLayout),
TupleDomain.all(),
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,19 @@ private void initializeQuery(TileDBSplit split) throws TileDBError
for (int i = 0; i < columnHandles.size(); i++) {
columnIndexLookup.put(columnHandles.get(i).getColumnName(), i);
}

try (io.tiledb.java.api.Domain domain = arraySchema.getDomain()) {
// If empty we add all dimensions and attributes. This is needed for count queries with a where clause.
if (columnIndexLookup.isEmpty()) {
for (int i = 0; i < arraySchema.getAttributeNum(); i++) {
columnIndexLookup.put(arraySchema.getAttribute(i).getName(), 0);
}
for (int i = 0; i < domain.getNDim(); i++) {
columnIndexLookup.put(domain.getDimension(i).getName(), 0);
}
}
}

HashMap<String, Pair<Long, Long>> estimations = new HashMap<>();
String name;

Expand Down Expand Up @@ -309,12 +322,6 @@ private void initializeQuery(TileDBSplit split) throws TileDBError
}

try (io.tiledb.java.api.Domain domain = arraySchema.getDomain()) {
// If we're empty let's at least select the first dimension
// this is needed for count queries
if (columnIndexLookup.isEmpty()) {
columnIndexLookup.put(domain.getDimension(0).getName(), 0);
}

for (int i = 0; i < domain.getNDim(); i++) {
try (Dimension dim = domain.getDimension(i)) {
name = dim.getName();
Expand Down Expand Up @@ -467,7 +474,7 @@ private void setRanges(TileDBSplit split) throws TileDBError
HashMap<String, Attribute> attributes = arraySchema.getAttributes();
Iterator it = attributes.entrySet().iterator();
QueryCondition finalQueryCondition = null;
while (it.hasNext()) {
while (it.hasNext()) { ////TODO the code below will never run since TileDB's Query Condition is disabled at the moment.
Map.Entry pair = (Map.Entry) it.next();
Attribute att = (Attribute) pair.getValue();
Pair attBounds = getBoundsForAttribute(split, att);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,26 +155,19 @@ public void testQueryCondition()
dropArray(arrayName);
create1D2AVector(arrayName);

MaterializedResult desc = computeActual(format("DESC %s", arrayName)).toTestTypes();
assertEquals(desc,
MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR)
.row("x", "bigint", "", "Dimension")
.row("a1", "integer", "", "Attribute")
.row("a2", "varchar", "", "Attribute")
.row("a3", "real", "", "Attribute")
.build());
String insertSql = format("INSERT INTO %s (x, a1, a2, a3) VALUES " +
"(0, 10, 'a', 300.0), (3, 13, 'b', 200.0), (5, 15, 'c', 1000.0), (6, 16, 'd', 1.0), (7, 124, 'e', 20.0)", arrayName);
"(0, 10, '', 300.0), (3, 13, 'b', 200.0), (5, 15, 'c', 1000.0), (6, 16, 'd', 1.0), (7, 124, 'e', 20.0)", arrayName);
getQueryRunner().execute(insertSql);

String selectSql1 = format("SELECT * FROM %s WHERE a1 > 100 ORDER by x ASC", arrayName); // pushed down
String selectSql2 = format("SELECT * FROM %s WHERE a2 = 'c' ORDER by x ASC", arrayName); // pushed down
String selectSql3 = format("SELECT * FROM %s WHERE a2 >= 'b' and a2 < 'e' ORDER by x ASC", arrayName); // pushed down
String selectSql4 = format("SELECT * FROM %s WHERE a1 > 15 and a1 < 100 ORDER by x ASC", arrayName); // pushed down
String selectSql5 = format("SELECT * FROM %s WHERE a1 > 15 or a1 < 12 ORDER by x ASC", arrayName); // not pushed down
String selectSql5 = format("SELECT * FROM %s WHERE a1 = 15 OR a2 = 'b' ORDER by x ASC", arrayName); // not pushed down
String selectSql6 = format("SELECT * FROM %s WHERE a3 < 300 ORDER by x ASC", arrayName); // pushed down
String selectSql7 = format("SELECT * FROM %s WHERE a3 < 30 or a3 > 500 ORDER by x ASC", arrayName); // not pushed down
String selectSql8 = format("SELECT * FROM %s WHERE a1 > 15 AND a3 > 3 ORDER by x ASC", arrayName); // pushed down
String selectSql7 = format("SELECT * FROM %s WHERE a1 > 15 AND a3 > 3 ORDER by x ASC", arrayName); // pushed down
String selectSql8 = format("SELECT * FROM %s WHERE a2 = ''", arrayName);
String selectSql9 = format("SELECT count(*) FROM %s WHERE a2 = ''", arrayName);
MaterializedResult selectResult1 = computeActual(selectSql1);
MaterializedResult selectResult2 = computeActual(selectSql2);
MaterializedResult selectResult3 = computeActual(selectSql3);
Expand All @@ -183,6 +176,7 @@ public void testQueryCondition()
MaterializedResult selectResult6 = computeActual(selectSql6);
MaterializedResult selectResult7 = computeActual(selectSql7);
MaterializedResult selectResult8 = computeActual(selectSql8);
MaterializedResult selectResult9 = computeActual(selectSql9);

assertEquals(selectResult1, MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), BIGINT, INTEGER, VARCHAR, REAL)
.row((long) 7, 124, "e", 20.0f)
Expand All @@ -203,9 +197,8 @@ public void testQueryCondition()
.build());

assertEquals(selectResult5, MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), BIGINT, INTEGER, VARCHAR, REAL)
.row((long) 0, 10, "a", 300.0f)
.row((long) 6, 16, "d", 1.0f)
.row((long) 7, 124, "e", 20.0f)
.row((long) 3, 13, "b", 200.0f)
.row((long) 5, 15, "c", 1000.0f)
.build());

assertEquals(selectResult6, MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), BIGINT, INTEGER, VARCHAR, REAL)
Expand All @@ -215,13 +208,15 @@ public void testQueryCondition()
.build());

assertEquals(selectResult7, MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), BIGINT, INTEGER, VARCHAR, REAL)
.row((long) 5, 15, "c", 1000.0f)
.row((long) 6, 16, "d", 1.0f)
.row((long) 7, 124, "e", 20.0f)
.build());

assertEquals(selectResult8, MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), BIGINT, INTEGER, VARCHAR, REAL)
.row((long) 7, 124, "e", 20.0f)
.row((long) 0, 10, "", 300.0f)
.build());

assertEquals(selectResult9, MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), BIGINT)
.row((long) 1)
.build());

dropArray(arrayName);
Expand Down Expand Up @@ -1307,26 +1302,18 @@ public void testDimensionFiltering()
* Reads a two-dimensional dense array with nullable attributes.
*/
@Test
public void testRead2DVectorNullableDense()
public void test2DVectorNullableDense()
{
String selectSql = format("SELECT * FROM %s", denseURI);
String selectSql = format("SELECT * FROM %s ORDER BY rows ASC", denseURI);
MaterializedResult selectResult = computeActual(selectSql);
List<MaterializedRow> resultRows = selectResult.getMaterializedRows();
MaterializedResult expected = MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), DOUBLE, INTEGER)
.row(1, 2, 3.0, 4)
MaterializedResult expected = MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), INTEGER, INTEGER, REAL, INTEGER)
.row(1, 1, null, 1)
.row(1, 2, 3.0, 4)
.row(2, 1, 4.0, null)
.row(2, 2, null, 2)
.build();
//using string representation because of null values
List<MaterializedRow> expectedRows = expected.getMaterializedRows();
List<String> resultRowsToString = resultRows.stream()
.map(object -> Objects.toString(object, null))
.collect(Collectors.toList());
List<String> expectedRowsToString = expectedRows.stream()
.map(object -> Objects.toString(object, null))
.collect(Collectors.toList());
assertTrue(expectedRowsToString.size() == resultRowsToString.size() && expectedRowsToString.containsAll(resultRowsToString) && resultRowsToString.containsAll(expectedRowsToString)); //presto returns rows in different every time.
assertEquals(expected.toString(), selectResult.toString());
}

/**
Expand Down Expand Up @@ -1400,30 +1387,19 @@ private void sparseArrayNullableWrite() throws TileDBError
* Reads a two-dimensional sparse array with nullable attributes.
*/
@Test
public void testRead2DVectorNullableSparse()
public void test2DVectorNullableSparse()
{
String selectSql = format("SELECT * FROM %s", sparseURI);
String selectSql = format("SELECT * FROM %s ORDER BY d1 ASC", sparseURI);
MaterializedResult selectResult = computeActual(selectSql);
List<MaterializedRow> resultRows = selectResult.getMaterializedRows();
// for (MaterializedRow row : resultRows) {
// System.out.println(row);
// }
MaterializedResult expected = MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), INTEGER, VARCHAR)
MaterializedResult expected = MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), INTEGER, INTEGER, VARCHAR)
.row(1, null, "aa")
.row(2, null, "bb")
.row(3, null, "cc")
.row(4, 4, null)
.row(5, 5, null)
.build();
//using string representation because of null values
List<MaterializedRow> expectedRows = expected.getMaterializedRows();
List<String> resultRowsToString = resultRows.stream()
.map(object -> Objects.toString(object, null))
.collect(Collectors.toList());
List<String> expectedRowsToString = expectedRows.stream()
.map(object -> Objects.toString(object, null))
.collect(Collectors.toList());
assertTrue(expectedRowsToString.size() == resultRowsToString.size() && expectedRowsToString.containsAll(resultRowsToString) && resultRowsToString.containsAll(expectedRowsToString)); //presto returns rows in different every time.

assertEquals(expected.toString(), selectResult.toString());
}

/**
Expand Down Expand Up @@ -1532,10 +1508,10 @@ public void test1DNoAttributeWriteRead()

String selectSql = format("SELECT * FROM %s", arrayName);
MaterializedResult selectResult = computeActual(selectSql);
List<MaterializedRow> resultRows = selectResult.getMaterializedRows();
for (MaterializedRow row : resultRows) {
System.out.println(row);
}
MaterializedResult expected = MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), INTEGER)
.row(4)
.build();
assertEquals(expected, selectResult);
dropArray(arrayName);
}

Expand Down Expand Up @@ -1575,7 +1551,7 @@ private void create1DVectorNoAttribute(String arrayName)
{
QueryRunner queryRunner = getQueryRunner();
String createSql = format("CREATE TABLE %s(" +
"x bigint WITH (dimension=true) " +
"x integer WITH (dimension=true) " +
") WITH (uri='%s')", arrayName, arrayName);
queryRunner.execute(createSql);
}
Expand Down

0 comments on commit 058a5f6

Please sign in to comment.