Skip to content

Commit

Permalink
DRILL-3474: Add implicit file columns support
Browse files Browse the repository at this point in the history
  • Loading branch information
arina-ielchiieva authored and parthchandra committed Jun 19, 2016
1 parent bd1d9c2 commit 3209886
Show file tree
Hide file tree
Showing 9 changed files with 461 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.base.Functions;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
Expand Down Expand Up @@ -67,17 +67,15 @@ public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan
final List<SchemaPath> columns = config.getColumns();
final String partitionDesignator = context.getOptions()
.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
List<Map<String, String>> implicitColumns = Lists.newLinkedList();
boolean selectAllQuery = AbstractRecordReader.isStarQuery(columns);

final boolean hasPartitions = (partitions != null && partitions.size() > 0);

final List<String[]> partitionColumns = Lists.newArrayList();
final List<Integer> selectedPartitionColumns = Lists.newArrayList();
List<SchemaPath> newColumns = columns;
if (AbstractRecordReader.isStarQuery(columns)) {
for (int i = 0; i < table.getPartitionKeys().size(); i++) {
selectedPartitionColumns.add(i);
}
} else {
if (!selectAllQuery) {
// Separate out the partition and non-partition columns. Non-partition columns are passed directly to the
// ParquetRecordReader. Partition columns are passed to ScanBatch.
newColumns = Lists.newArrayList();
Expand All @@ -86,7 +84,7 @@ public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan
Matcher m = pattern.matcher(column.getAsUnescapedPath());
if (m.matches()) {
selectedPartitionColumns.add(
Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length())));
Integer.parseInt(column.getAsUnescapedPath().substring(partitionDesignator.length())));
} else {
newColumns.add(column);
}
Expand All @@ -103,6 +101,7 @@ public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan
// TODO: In future we can get this cache from Metadata cached on filesystem.
final Map<String, ParquetMetadata> footerCache = Maps.newHashMap();

Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
try {
for (InputSplit split : splits) {
final FileSplit fileSplit = (FileSplit) split;
Expand All @@ -128,10 +127,19 @@ public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan
parquetMetadata,
newColumns)
);
Map<String, String> implicitValues = Maps.newLinkedHashMap();

if (hasPartitions) {
Partition p = partitions.get(currentPartitionIndex);
partitionColumns.add(p.getValues().toArray(new String[0]));
List<String> values = partitions.get(currentPartitionIndex).getValues();
for (int i = 0; i < values.size(); i++) {
if (selectAllQuery || selectedPartitionColumns.contains(i)) {
implicitValues.put(partitionDesignator + i, values.get(i));
}
}
}
implicitColumns.add(implicitValues);
if (implicitValues.size() > mapWithMaxColumns.size()) {
mapWithMaxColumns = implicitValues;
}
}
currentPartitionIndex++;
Expand All @@ -141,14 +149,20 @@ public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan
throw new ExecutionSetupException("Failed to create RecordReaders. " + e.getMessage(), e);
}

// all readers should have the same number of implicit columns, add missing ones with value null
mapWithMaxColumns = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
for (Map<String, String> map : implicitColumns) {
map.putAll(Maps.difference(map, mapWithMaxColumns).entriesOnlyOnRight());
}

// If there are no readers created (which is possible when the table is empty or no row groups are matched),
// create an empty RecordReader to output the schema
if (readers.size() == 0) {
readers.add(new HiveRecordReader(table, null, null, columns, context, conf,
ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName())));
}

return new ScanBatch(config, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns);
return new ScanBatch(config, context, oContext, readers.iterator(), implicitColumns);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ public interface ExecConstants {
String FILESYSTEM_PARTITION_COLUMN_LABEL = "drill.exec.storage.file.partition.column.label";
OptionValidator FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR = new StringValidator(FILESYSTEM_PARTITION_COLUMN_LABEL, "dir");

/**
* Implicit file columns
*/
String IMPLICIT_FILENAME_COLUMN_LABEL = "drill.exec.storage.implicit.filename.column.label";
OptionValidator IMPLICIT_FILENAME_COLUMN_LABEL_VALIDATOR = new StringValidator(IMPLICIT_FILENAME_COLUMN_LABEL, "filename");
String IMPLICIT_SUFFIX_COLUMN_LABEL = "drill.exec.storage.implicit.suffix.column.label";
OptionValidator IMPLICIT_SUFFIX_COLUMN_LABEL_VALIDATOR = new StringValidator(IMPLICIT_SUFFIX_COLUMN_LABEL, "suffix");
String IMPLICIT_FQN_COLUMN_LABEL = "drill.exec.storage.implicit.fqn.column.label";
OptionValidator IMPLICIT_FQN_COLUMN_LABEL_VALIDATOR = new StringValidator(IMPLICIT_FQN_COLUMN_LABEL, "fqn");
String IMPLICIT_FILEPATH_COLUMN_LABEL = "drill.exec.storage.implicit.filepath.column.label";
OptionValidator IMPLICIT_FILEPATH_COLUMN_LABEL_VALIDATOR = new StringValidator(IMPLICIT_FILEPATH_COLUMN_LABEL, "filepath");

String JSON_READ_NUMBERS_AS_DOUBLE = "store.json.read_numbers_as_double";
BooleanValidator JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(JSON_READ_NUMBERS_AS_DOUBLE, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
Expand All @@ -46,7 +45,6 @@
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
Expand All @@ -56,7 +54,6 @@
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

/**
Expand All @@ -80,20 +77,16 @@ public class ScanBatch implements CloseableRecordBatch {
private RecordReader currentReader;
private BatchSchema schema;
private final Mutator mutator = new Mutator();
private Iterator<String[]> partitionColumns;
private String[] partitionValues;
private List<ValueVector> partitionVectors;
private List<Integer> selectedPartitionColumns;
private String partitionColumnDesignator;
private boolean done = false;
private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private boolean hasReadNonEmptyFile = false;

private Map<String, ValueVector> implicitVectors;
private Iterator<Map<String, String>> implicitColumns;
private Map<String, String> implicitValues;

public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
OperatorContext oContext, Iterator<RecordReader> readers,
List<String[]> partitionColumns,
List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
List<Map<String, String>> implicitColumns) throws ExecutionSetupException {
this.context = context;
this.readers = readers;
if (!readers.hasNext()) {
Expand All @@ -118,24 +111,18 @@ public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
}
oContext.getStats().stopProcessing();
}
this.partitionColumns = partitionColumns.iterator();
partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
this.selectedPartitionColumns = selectedPartitionColumns;

// TODO Remove null check after DRILL-2097 is resolved. That JIRA refers to test cases that do not initialize
// options; so labelValue = null.
final OptionValue labelValue = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
partitionColumnDesignator = labelValue == null ? "dir" : labelValue.string_val;
this.implicitColumns = implicitColumns.iterator();
this.implicitValues = this.implicitColumns.hasNext() ? this.implicitColumns.next() : null;

addPartitionVectors();
addImplicitVectors();
}

public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
Iterator<RecordReader> readers)
throws ExecutionSetupException {
this(subScanConfig, context,
context.newOperatorContext(subScanConfig),
readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
readers, Collections.<Map<String, String>> emptyList());
}

@Override
Expand Down Expand Up @@ -221,7 +208,7 @@ public IterOutcome next() {

currentReader.close();
currentReader = readers.next();
partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
currentReader.setup(oContext, mutator);
try {
currentReader.allocate(fieldVectorMap);
Expand All @@ -230,8 +217,7 @@ public IterOutcome next() {
clearFieldVectorMap();
return IterOutcome.OUT_OF_MEMORY;
}
addPartitionVectors();

addImplicitVectors();
} catch (ExecutionSetupException e) {
this.context.fail(e);
releaseAssets();
Expand All @@ -241,7 +227,7 @@ public IterOutcome next() {
// At this point, the current reader has read 1 or more rows.

hasReadNonEmptyFile = true;
populatePartitionVectors();
populateImplicitVectors();

for (VectorWrapper w : container) {
w.getValueVector().getMutator().setValueCount(recordCount);
Expand Down Expand Up @@ -271,41 +257,43 @@ public IterOutcome next() {
}
}

private void addPartitionVectors() throws ExecutionSetupException {
private void addImplicitVectors() throws ExecutionSetupException {
try {
if (partitionVectors != null) {
for (ValueVector v : partitionVectors) {
if (implicitVectors != null) {
for (ValueVector v : implicitVectors.values()) {
v.clear();
}
}
partitionVectors = Lists.newArrayList();
for (int i : selectedPartitionColumns) {
final MaterializedField field =
MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i).getAsUnescapedPath(),
Types.optional(MinorType.VARCHAR));
final ValueVector v = mutator.addField(field, NullableVarCharVector.class);
partitionVectors.add(v);
implicitVectors = Maps.newHashMap();

if (implicitValues != null) {
for (String column : implicitValues.keySet()) {
final MaterializedField field = MaterializedField.create(column, Types.optional(MinorType.VARCHAR));
final ValueVector v = mutator.addField(field, NullableVarCharVector.class);
implicitVectors.put(column, v);
}
}
} catch(SchemaChangeException e) {
throw new ExecutionSetupException(e);
}
}

private void populatePartitionVectors() {
for (int index = 0; index < selectedPartitionColumns.size(); index++) {
final int i = selectedPartitionColumns.get(index);
final NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(index);
if (partitionValues.length > i) {
final String val = partitionValues[i];
AllocationHelper.allocate(v, recordCount, val.length());
final byte[] bytes = val.getBytes();
for (int j = 0; j < recordCount; j++) {
v.getMutator().setSafe(j, bytes, 0, bytes.length);
private void populateImplicitVectors() {
if (implicitValues != null) {
for (Map.Entry<String, String> entry : implicitValues.entrySet()) {
final NullableVarCharVector v = (NullableVarCharVector) implicitVectors.get(entry.getKey());
String val;
if ((val = entry.getValue()) != null) {
AllocationHelper.allocate(v, recordCount, val.length());
final byte[] bytes = val.getBytes();
for (int j = 0; j < recordCount; j++) {
v.getMutator().setSafe(j, bytes, 0, bytes.length);
}
v.getMutator().setValueCount(recordCount);
} else {
AllocationHelper.allocate(v, recordCount, 0);
v.getMutator().setValueCount(recordCount);
}
v.getMutator().setValueCount(recordCount);
} else {
AllocationHelper.allocate(v, recordCount, 0);
v.getMutator().setValueCount(recordCount);
}
}
}
Expand Down Expand Up @@ -418,7 +406,7 @@ public WritableBatch getWritableBatch() {
@Override
public void close() throws Exception {
container.clear();
for (final ValueVector v : partitionVectors) {
for (final ValueVector v : implicitVectors.values()) {
v.clear();
}
fieldVectorMap.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.store.ImplicitColumnExplorer;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
Expand All @@ -73,7 +74,6 @@

public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);

private Projector projector;
private List<ValueVector> allocationVectors;
private List<ComplexWriter> complexWriters;
Expand Down Expand Up @@ -351,6 +351,11 @@ protected boolean setupNewSchema() throws SchemaChangeException {
if (name == EMPTY_STRING) {
continue;
}

if (isImplicitFileColumn(vvIn)) {
continue;
}

final FieldReference ref = new FieldReference(name);
final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(), vvIn.getField().getType()), callBack);
final TransferPair tp = vvIn.makeTransferPair(vvOut);
Expand All @@ -369,6 +374,10 @@ protected boolean setupNewSchema() throws SchemaChangeException {
continue;
}

if (isImplicitFileColumn(vvIn)) {
continue;
}

final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry() );
if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
Expand Down Expand Up @@ -485,6 +494,10 @@ protected boolean setupNewSchema() throws SchemaChangeException {
}
}

private boolean isImplicitFileColumn(ValueVector vvIn) {
return ImplicitColumnExplorer.initImplicitFileColumns(context.getOptions()).get(vvIn.getField().getName()) != null;
}

private List<NamedExpression> getExpressionList() {
if (popConfig.getExprs() != null) {
return popConfig.getExprs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
ClassTransformer.SCALAR_REPLACEMENT_VALIDATOR,
ExecConstants.ENABLE_NEW_TEXT_READER,
ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST,
ExecConstants.WEB_LOGS_MAX_LINES_VALIDATOR
ExecConstants.WEB_LOGS_MAX_LINES_VALIDATOR,
ExecConstants.IMPLICIT_FILENAME_COLUMN_LABEL_VALIDATOR,
ExecConstants.IMPLICIT_SUFFIX_COLUMN_LABEL_VALIDATOR,
ExecConstants.IMPLICIT_FQN_COLUMN_LABEL_VALIDATOR,
ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL_VALIDATOR
};
final Map<String, OptionValidator> tmp = new HashMap<>();
for (final OptionValidator validator : validators) {
Expand Down
Loading

0 comments on commit 3209886

Please sign in to comment.