Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-5688: Add repeated map support to column accessors #887

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.record;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;

/**
* Defines the schema of a tuple: either the top-level row or a nested
* "map" (really structure). A schema is a collection of columns (backed
* by vectors in the loader itself.) Columns are accessible by name or
* index. New columns may be added at any time; the new column takes the
* next available index.
*/

public class TupleSchema implements TupleMetadata {

public static abstract class BaseColumnMetadata implements ColumnMetadata {
private final int index;
private final TupleSchema parent;
protected final MaterializedField schema;

public BaseColumnMetadata(int index, TupleSchema parent, MaterializedField schema) {
this.index = index;
this.parent = parent;
this.schema = schema;
}

@Override
public abstract StructureType structureType();
@Override
public abstract TupleMetadata mapSchema();
@Override
public int index() { return index; }
@Override
public MaterializedField schema() { return schema; }
@Override
public String name() { return schema.getName(); }
@Override
public MajorType majorType() { return schema.getType(); }
@Override
public MinorType type() { return schema.getType().getMinorType(); }
@Override
public DataMode mode() { return schema.getDataMode(); }
@Override
public TupleMetadata parent() { return parent; }
public MapColumnMetadata parentMap() { return parent.map(); }

@Override
public String fullName( ) {
MapColumnMetadata parentMap = parentMap();
if (parentMap == null) {
return name();
} else {
return parentMap.fullName() + "." + name();
}
}

@Override
public boolean isEquivalent(ColumnMetadata other) {
return schema.isEquivalent(other.schema());
}
}

public static class PrimitiveColumnMetadata extends BaseColumnMetadata {

public PrimitiveColumnMetadata(int index, TupleSchema parent,
MaterializedField schema) {
super(index, parent, schema);
}

@Override
public StructureType structureType() { return StructureType.PRIMITIVE; }
@Override
public TupleMetadata mapSchema() { return null; }
}

public static class MapColumnMetadata extends BaseColumnMetadata {
private final TupleMetadata mapSchema;

public MapColumnMetadata(int index, TupleSchema parent, MaterializedField schema) {
super(index, parent, schema);
mapSchema = new TupleSchema(this);
for (MaterializedField child : schema.getChildren()) {
mapSchema.add(child);
}
}

@Override
public StructureType structureType() { return StructureType.TUPLE; }
@Override
public TupleMetadata mapSchema() { return mapSchema; }
}

private final MapColumnMetadata parentMap;
private final TupleNameSpace<ColumnMetadata> nameSpace = new TupleNameSpace<>();

public TupleSchema() { this((MapColumnMetadata) null); }

public TupleSchema(MapColumnMetadata parentMap) {
this.parentMap = parentMap;
}

public static TupleMetadata fromFields(MapColumnMetadata parent, Iterable<MaterializedField> fields) {
TupleMetadata tuple = new TupleSchema(parent);
for (MaterializedField field : fields) {
tuple.add(field);
}
return tuple;
}

public static TupleMetadata fromFields(Iterable<MaterializedField> fields) {
return fromFields(null, fields);
}

public TupleMetadata copy() {
TupleMetadata tuple = new TupleSchema();
for (ColumnMetadata md : this) {
tuple.add(md.schema());
}
return tuple;
}

@Override
public void add(MaterializedField field) {
int index = nameSpace.count();
ColumnMetadata md;
if (field.getType().getMinorType() == MinorType.MAP) {
md = new MapColumnMetadata(index, this, field);
} else {
md = new PrimitiveColumnMetadata(index, this, field);
}
nameSpace.add(field.getName(), md);
}

@Override
public MaterializedField column(String name) {
ColumnMetadata md = metadata(name);
return md == null ? null : md.schema();
}

@Override
public ColumnMetadata metadata(String name) {
return nameSpace.get(name);
}

@Override
public int index(String name) {
return nameSpace.indexOf(name);
}

@Override
public MaterializedField column(int index) {
ColumnMetadata md = metadata(index);
return md == null ? null : md.schema();
}

@Override
public ColumnMetadata metadata(int index) {
return nameSpace.get(index);
}

public MapColumnMetadata map() { return parentMap; }
@Override
public int size() { return nameSpace.count(); }

@Override
public boolean isEmpty() { return nameSpace.count( ) == 0; }

@Override
public Iterator<ColumnMetadata> iterator() {
return nameSpace.iterator();
}

@Override
public boolean isEquivalent(TupleMetadata other) {
TupleSchema otherSchema = (TupleSchema) other;
if (nameSpace.count() != otherSchema.nameSpace.count()) {
return false;
}
for (int i = 0; i < nameSpace.count(); i++) {
if (! nameSpace.get(i).isEquivalent(otherSchema.nameSpace.get(i))) {
return false;
}
}
return true;
}

@Override
public List<MaterializedField> toFieldList() {
List<MaterializedField> cols = new ArrayList<>();
for (ColumnMetadata md : nameSpace) {
cols.add(md.schema());
}
return cols;
}

public BatchSchema toBatchSchema(SelectionVectorMode svMode) {
return new BatchSchema(svMode, toFieldList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.util.AssertionUtil;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

Expand All @@ -51,7 +52,8 @@
public class SystemOptionManager extends BaseOptionManager implements OptionManager, AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemOptionManager.class);

private static final CaseInsensitiveMap<OptionValidator> VALIDATORS;
@VisibleForTesting
public static final CaseInsensitiveMap<OptionValidator> VALIDATORS;

static {
final OptionValidator[] validators = new OptionValidator[]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.cache;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.BufferedInputStream;
Expand All @@ -35,7 +34,7 @@
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
import org.apache.drill.test.rowSet.RowSetWriter;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.apache.drill.test.rowSet.RowSetUtilities;
Expand Down Expand Up @@ -76,7 +75,7 @@ public SingleRowSet makeNullableRowSet(BatchSchema schema, int rowCount) {
if (i % 2 == 0) {
RowSetUtilities.setFromInt(writer, 0, i);
} else {
writer.column(0).setNull();
writer.scalar(0).setNull();
}
writer.save();
}
Expand Down Expand Up @@ -169,9 +168,9 @@ public void testTypes() throws IOException {

private SingleRowSet buildMapSet(BatchSchema schema) {
return fixture.rowSetBuilder(schema)
.add(1, 100, "first")
.add(2, 200, "second")
.add(3, 300, "third")
.add(1, new Object[] {100, "first"})
.add(2, new Object[] {200, "second"})
.add(3, new Object[] {300, "third"})
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.TupleMetadata;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.rowSet.DirectRowSet;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.apache.drill.test.rowSet.RowSetSchema;
import org.apache.drill.test.rowSet.SchemaBuilder;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;

Expand Down Expand Up @@ -92,7 +92,7 @@ public void addOutput(SingleRowSet output) {
public void run() throws Exception {
PriorityQueueCopierWrapper copier = makeCopier(fixture, sortOrder, nullOrder);
List<BatchGroup> batches = new ArrayList<>();
RowSetSchema schema = null;
TupleMetadata schema = null;
for (SingleRowSet rowSet : rowSets) {
batches.add(new BatchGroup.InputBatch(rowSet.container(), rowSet.getSv2(),
fixture.allocator(), rowSet.size()));
Expand All @@ -103,7 +103,7 @@ public void run() throws Exception {
int rowCount = outputRowCount();
VectorContainer dest = new VectorContainer();
@SuppressWarnings("resource")
BatchMerger merger = copier.startMerge(schema.toBatchSchema(SelectionVectorMode.NONE),
BatchMerger merger = copier.startMerge(new BatchSchema(SelectionVectorMode.NONE, schema.toFieldList()),
batches, dest, rowCount);

verifyResults(merger, dest);
Expand All @@ -121,7 +121,7 @@ public int outputRowCount() {
protected void verifyResults(BatchMerger merger, VectorContainer dest) {
for (RowSet expectedSet : expected) {
assertTrue(merger.next());
RowSet rowSet = new DirectRowSet(fixture.allocator(), dest);
RowSet rowSet = DirectRowSet.fromContainer(fixture.allocator(), dest);
new RowSetComparison(expectedSet)
.verifyAndClearAll(rowSet);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@
import org.apache.drill.exec.physical.impl.xsort.managed.SortTestUtilities.CopierTester;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.test.DrillTest;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.apache.drill.test.rowSet.RowSetWriter;
import org.apache.drill.test.rowSet.SchemaBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

/**
Expand All @@ -48,19 +46,7 @@
* then additional tests should be added to re-validate the code.
*/

public class TestCopier extends DrillTest {

public static OperatorFixture fixture;

@BeforeClass
public static void setup() {
fixture = OperatorFixture.builder().build();
}

@AfterClass
public static void tearDown() throws Exception {
fixture.close();
}
public class TestCopier extends SubOperatorTest {

@Test
public void testEmptyInput() throws Exception {
Expand Down Expand Up @@ -129,7 +115,7 @@ public static SingleRowSet makeDataSet(BatchSchema schema, int first, int step,
int value = first;
for (int i = 0; i < count; i++, value += step) {
RowSetUtilities.setFromInt(writer, 0, value);
writer.column(1).setString(Integer.toString(value));
writer.scalar(1).setString(Integer.toString(value));
writer.save();
}
writer.done();
Expand Down Expand Up @@ -354,22 +340,22 @@ public void testMapType(OperatorFixture fixture) throws Exception {

CopierTester tester = new CopierTester(fixture);
tester.addInput(fixture.rowSetBuilder(schema)
.add(1, 10, 100)
.add(5, 50, 500)
.add(1, new Object[] {10, new Object[] {100}})
.add(5, new Object[] {50, new Object[] {500}})
.withSv2()
.build());

tester.addInput(fixture.rowSetBuilder(schema)
.add(2, 20, 200)
.add(6, 60, 600)
.add(2, new Object[] {20, new Object[] {200}})
.add(6, new Object[] {60, new Object[] {600}})
.withSv2()
.build());

tester.addOutput(fixture.rowSetBuilder(schema)
.add(1, 10, 100)
.add(2, 20, 200)
.add(5, 50, 500)
.add(6, 60, 600)
.add(1, new Object[] {10, new Object[] {100}})
.add(2, new Object[] {20, new Object[] {200}})
.add(5, new Object[] {50, new Object[] {500}})
.add(6, new Object[] {60, new Object[] {600}})
.build());

tester.run();
Expand Down
Loading