Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-8507, DRILL-8508 Better handling of partially missing parquet columns #2937

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public int getRecordCount() {

private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean newSchema) {
Preconditions.checkArgument(batchStatus.batch.getSchema().getFieldCount() == container.getSchema().getFieldCount(),
"Input batch and output batch have different field counthas!");
"Input batch and output batch have different field counts!");

if (newSchema) {
createUnionAller(batchStatus.batch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.resolver;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -751,6 +752,17 @@ public static MinorType getLeastRestrictiveType(MinorType... types) {
return result;
}

public static MajorType getLeastRestrictiveMajorType(MajorType... majorTypes) {
paul-rogers marked this conversation as resolved.
Show resolved Hide resolved
MinorType[] minorTypes = Arrays.stream(majorTypes).map(MajorType::getMinorType).toArray(MinorType[]::new);
DataMode[] dataModes = Arrays.stream(majorTypes).map(MajorType::getMode).toArray(DataMode[]::new);
MinorType leastRestrictiveMinorType = getLeastRestrictiveType(minorTypes);
DataMode leastRestrictiveDataMode = getLeastRestrictiveDataMode(dataModes);
return MajorType.newBuilder()
.setMinorType(leastRestrictiveMinorType)
.setMode(leastRestrictiveDataMode)
.build();
}

/**
* Finds the type in a given set that has the cheapest cast from a given
* starting type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,9 @@ private Map<String, String> createReaderAndImplicitColumns(ExecutorFragmentConte
ccf,
footer,
rowGroupScan.getColumns(),
containsCorruptDates);
containsCorruptDates,
// each parquet SubScan shares the same table schema constructed by a GroupScan
rowGroupScan.getSchema());
}

logger.debug("Query {} uses {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.planner.common.DrillStatsTable;
import org.apache.drill.exec.record.SchemaUtil;
import org.apache.drill.metastore.metadata.BaseTableMetadata;
Expand Down Expand Up @@ -52,6 +53,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.api.Binary;
Expand Down Expand Up @@ -661,6 +663,12 @@ static Map<SchemaPath, TypeProtos.MajorType> resolveFields(MetadataBase.ParquetT
// row groups in the file have the same schema, so using the first one
Map<SchemaPath, TypeProtos.MajorType> fileColumns = getFileFields(parquetTableMetadata, file);
fileColumns.forEach((columnPath, type) -> putType(columns, columnPath, type));
// If at least 1 parquet file to read doesn't contain a column, enforce this column
// DataMode to OPTIONAL in the overall table schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general rule has to be:

  • For all columns that exist, define a common type that can hold all of the associated column types.
  • If any column is optional (or missing), assign the OPTIONAL type -- but only if the other types are REQUIRED.
  • If all columns are REPEATED, then the missing column is also REPEATED. (In Drill, a zero-length array is the same as NULL: there is no such thing as a NULL array in Drill.)
  • If any column is REPEATED, and some column is OPTIONAL or REQUIRED, then choose REPEATED as the column type. Ensure that the runtime code handles the case of writing a single value into the array when we read the file with the OPTIONAL or REQUIRED column.

IIRC, EVF handles all the above for dynamic columns. If Drill had type logic in the Calcite planner, it should handle these same rules.

Again, this kind of logic requires extensive unit tests of all the cases above, plus any others you can think up.

Copy link
Author

@ychernysh ychernysh Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first item is about resolving different data types even if there are no missing columns, which I didn't cover.
but only if the other types are REQUIRED - is this condition necessary?
Regarding REPEATED - I haven't covered it in any way.

In theory, implementing these should not be that hard...

for (SchemaPath column: Sets.symmetricDifference(columns.keySet(), fileColumns.keySet())) {
TypeProtos.MinorType minorType = columns.get(column).getMinorType();
columns.put(column, Types.optional(minorType));
}
}
return columns;
}
Expand All @@ -680,13 +688,7 @@ private static void putType(Map<SchemaPath, TypeProtos.MajorType> columns, Schem
if (majorType == null) {
columns.put(columnPath, type);
} else if (!majorType.equals(type)) {
TypeProtos.MinorType leastRestrictiveType = TypeCastRules.getLeastRestrictiveType(
majorType.getMinorType(),
type.getMinorType()
);
if (leastRestrictiveType != majorType.getMinorType()) {
columns.put(columnPath, type);
}
columns.put(columnPath, TypeCastRules.getLeastRestrictiveMajorType(majorType, type));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.drill.exec.store.parquet.columnreaders;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.BitVector;
Expand Down Expand Up @@ -73,7 +74,10 @@ static ColumnReader<?> createFixedColumnReader(ParquetRecordReader recordReader,
ConvertedType convertedType = schemaElement.getConverted_type();
// if the column is required, or repeated (in which case we just want to use this to generate our appropriate
// ColumnReader for actually transferring data into the data vector inside of our repeated vector
if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0) {
// Choose a reader based on a ValueVector DataMode since we might want to put
// parquet's REQUIRED column into a Drill's OPTIONAL ValueVector
// see ParquetSchema#tableSchema for details
if (v.getField().getDataMode() != TypeProtos.DataMode.OPTIONAL) {
paul-rogers marked this conversation as resolved.
Show resolved Hide resolved
return getColumnReader(recordReader, fixedLength, descriptor, columnChunkMetaData, v, schemaElement, convertedType);
} else { // if the column is nullable
return getNullableColumnReader(recordReader, descriptor,
Expand All @@ -86,8 +90,11 @@ static VarLengthValuesColumn<?> getReader(ParquetRecordReader parentReader, Colu
SchemaElement schemaElement
) throws ExecutionSetupException {
ConvertedType convertedType = schemaElement.getConverted_type();
switch (descriptor.getMaxDefinitionLevel()) {
case 0:
// Choose a reader based on a ValueVector DataMode since we might want to put
// parquet's REQUIRED column into a Drill's OPTIONAL ValueVector
// see ParquetSchema#tableSchema for details
switch (v.getField().getDataMode()) {
case REQUIRED:
if (convertedType == null) {
return new VarLengthColumnReaders.VarBinaryColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,11 @@ protected int decodeLevels() throws IOException {
ValuesReader dlReader = dlEncoding.getValuesReader(columnDescriptor, ValuesType.DEFINITION_LEVEL);
dlReader.initFromPage(pageValueCount, dataStream);
this.definitionLevels = new ValuesReaderIntIterator(dlReader);
} else {
// Even if all values in a page are REQUIRED, still initialize definitionLevels this way
// to be able to read such a column with NullableColumnReader and treat each value
// definition as 1
this.definitionLevels = () -> 1;
}

dataOffset = (int) dataStream.position();
Expand All @@ -511,6 +516,11 @@ protected int decodeLevels() throws IOException {
maxDefLevel,
BytesInput.from(pageData.nioBuffer(repLevelLen, defLevelLen))
);
} else {
// Even if all values in a page are REQUIRED, still initialize definitionLevels this way
// to be able to read such a column with NullableColumnReader and treat each value
// definition as 1
this.definitionLevels = () -> 1;
}

dataOffset = repLevelLen + defLevelLen;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public ParquetColumnMetadata(ColumnDescriptor column) {
this.column = column;
}

public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options) {
public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options, boolean isEnforcedOptional) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, if we are enforcing a planner-provided schema, the job is to map whatever the Parquet type is into the given, fixed Drill type. There is only one right answer when the schema is provided. Again, see EVF for how this works.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try it...

se = schemaElements.get(ParquetReaderUtility.getFullColumnPath(column));
type = ParquetToDrillTypeConverter.toMajorType(column.getType(), column.getTypeLength(),
getDataMode(column), se, options);
isEnforcedOptional ? DataMode.OPTIONAL : getDataMode(column), se, options);
field = MaterializedField.create(toFieldName(column.getPath()).getLastSegment().getNameSegment().getPath(), type);
length = getDataTypeLength();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;

import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.CommonParquetRecordReader;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -72,6 +73,11 @@ public class ParquetRecordReader extends CommonParquetRecordReader {

private final boolean useBulkReader;

/**
* See {@link ParquetSchema#tableSchema}
*/
private final TupleMetadata tableSchema;
paul-rogers marked this conversation as resolved.
Show resolved Hide resolved

public ParquetRecordReader(FragmentContext fragmentContext,
Path path,
int rowGroupIndex,
Expand All @@ -80,8 +86,8 @@ public ParquetRecordReader(FragmentContext fragmentContext,
CompressionCodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
this(fragmentContext, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus, TupleMetadata tableSchema) {
this(fragmentContext, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus, tableSchema);
}

public ParquetRecordReader(FragmentContext fragmentContext,
Expand All @@ -91,9 +97,9 @@ public ParquetRecordReader(FragmentContext fragmentContext,
CompressionCodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus, TupleMetadata tableSchema) {
this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(), path, rowGroupIndex, fs, codecFactory,
footer, columns, dateCorruptionStatus);
footer, columns, dateCorruptionStatus, tableSchema);
}

public ParquetRecordReader(
Expand All @@ -105,13 +111,14 @@ public ParquetRecordReader(
CompressionCodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus, TupleMetadata tableSchema) {
super(footer, fragmentContext);
this.hadoopPath = path;
this.fileSystem = fs;
this.codecFactory = codecFactory;
this.rowGroupIndex = rowGroupIndex;
this.dateCorruptionStatus = dateCorruptionStatus;
this.tableSchema = tableSchema;
this.numRecordsToRead = initNumRecordsToRead(numRecordsToRead, rowGroupIndex, footer);
this.useAsyncColReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
this.useAsyncPageReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
Expand Down Expand Up @@ -185,7 +192,7 @@ public ReadState getReadState() {
@Override
public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
this.operatorContext = operatorContext;
ParquetSchema schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns());
ParquetSchema schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns(), tableSchema);
batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead, new RecordBatchStatsContext(fragmentContext, operatorContext));

logger.debug("Reading {} records from row group({}) in file {}.", numRecordsToRead, rowGroupIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand Down Expand Up @@ -64,6 +65,19 @@ public final class ParquetSchema {
private final int rowGroupIndex;
private final ParquetMetadata footer;

/**
* Schema for the whole table constructed by a GroupScan from all the parquet files to read.
* If we don't find a selected column in our parquet file, type for the null-filled vector
* to create would be tried to find in this schema. That is, if some other parquet file contains
* the column, we'll take their type. Otherwise, default to Nullable Int.
* Also, if at least 1 file does not contain the selected column, then the overall table schema
* should have this field with OPTIONAL data mode. GroupScan catches this case and sets the
* appropriate data mode in this schema. Our mission here is to enforce that OPTIONAL mode in our
* output schema, even if the particular parquet file we're reading from has this field REQUIRED,
* to provide consistency across all scan batches.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great start! See the other cases described above.

Also, I seem to remember creating code to handle evolving column types as part of EVF. Perhaps you can find that code. The code likely has a large number of unit tests (I'm a test-driven kinda guy) which you can reuse to test your parallel implementation.

private final TupleMetadata tableSchema;

/**
* List of metadata for selected columns. This list does two things.
* First, it identifies the Parquet columns we wish to select. Second, it
Expand Down Expand Up @@ -91,11 +105,12 @@ public final class ParquetSchema {
* this is a SELECT * query
*/

public ParquetSchema(OptionManager options, int rowGroupIndex, ParquetMetadata footer, Collection<SchemaPath> selectedCols) {
public ParquetSchema(OptionManager options, int rowGroupIndex, ParquetMetadata footer, Collection<SchemaPath> selectedCols, TupleMetadata tableSchema) {
this.options = options;
this.rowGroupIndex = rowGroupIndex;
this.selectedCols = selectedCols;
this.footer = footer;
this.tableSchema = tableSchema;
if (selectedCols == null) {
columnsFound = null;
} else {
Expand Down Expand Up @@ -127,14 +142,23 @@ private void loadParquetSchema() {
// loop to add up the length of the fixed width columns and build the schema
for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) {
ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column);
columnMetadata.resolveDrillType(schemaElements, options);
columnMetadata.resolveDrillType(schemaElements, options, shouldEnforceOptional(column));
if (!columnSelected(column)) {
continue;
}
selectedColumnMetadata.add(columnMetadata);
}
}

private boolean shouldEnforceOptional(ColumnDescriptor column) {
String columnName = SchemaPath.getCompoundPath(column.getPath()).getAsUnescapedPath();
MaterializedField tableField;
if (tableSchema == null || (tableField = tableSchema.column(columnName)) == null) {
return false;
}
return tableField.getDataMode() == DataMode.OPTIONAL;
}

/**
* Fixed-width fields are the easiest to plan. We know the size of each column,
* making it easy to determine the total length of each vector, once we know
Expand Down Expand Up @@ -206,7 +230,7 @@ private boolean columnSelected(ColumnDescriptor column) {
* @throws SchemaChangeException should not occur
*/

public void createNonExistentColumns(OutputMutator output, List<NullableIntVector> nullFilledVectors) throws SchemaChangeException {
public void createNonExistentColumns(OutputMutator output, List<ValueVector> nullFilledVectors) throws SchemaChangeException {
List<SchemaPath> projectedColumns = Lists.newArrayList(selectedCols);
for (int i = 0; i < columnsFound.length; i++) {
SchemaPath col = projectedColumns.get(i);
Expand All @@ -227,12 +251,14 @@ public void createNonExistentColumns(OutputMutator output, List<NullableIntVecto
* @throws SchemaChangeException should not occur
*/

private NullableIntVector createMissingColumn(SchemaPath col, OutputMutator output) throws SchemaChangeException {
// col.toExpr() is used here as field name since we don't want to see these fields in the existing maps
MaterializedField field = MaterializedField.create(col.toExpr(),
Types.optional(TypeProtos.MinorType.INT));
return (NullableIntVector) output.addField(field,
TypeHelper.getValueVectorClass(TypeProtos.MinorType.INT, DataMode.OPTIONAL));
private ValueVector createMissingColumn(SchemaPath col, OutputMutator output) throws SchemaChangeException {
String colName = col.getAsUnescapedPath();
MaterializedField tableField = tableSchema.column(colName);
TypeProtos.MinorType type = tableField == null ? TypeProtos.MinorType.INT : tableField.getType().getMinorType();
MaterializedField field = MaterializedField.create(colName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change: we are propagating the old column type. This is consistent with EVF.

However, this stuff is horribly complex. If we reuse the column, we must reuse the actual value vector. Otherwise, you'll get crashes in the downstream operators that are bound to that vector. The binding is redone only on a schema change. But, your fix avoids the schema change, and hence prevents the rebinding.

Also, note that this fix works ONLY in one direction (column appears, then disappears), and ONLY within a single thread: it can't solve the same problem if the two files are read in different threads and sent to the SORT to reconcile.

Further, we are changing the mode to OPTIONAL as required so we can fill the vector with NULL values. However, change of mode (i.e. nullability) is a schema change and will cause the SORT to fail. We have to have known, on the previous file, that the column will be missing in this file, so that we can create the original column as OPTIONAL.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see this comment.

Types.optional(type));
return output.addField(field,
TypeHelper.getValueVectorClass(type, DataMode.OPTIONAL));
}

Map<String, Integer> buildChunkMap(BlockMetaData rowGroupMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.parquet.ParquetReaderStats;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand All @@ -51,7 +50,7 @@ public class ReadState {
* at that position in the schema. Currently this requires a vector be present. Here is a list of all of these vectors
* that need only have their value count set at the end of each call to next(), as the values default to null.
*/
private List<NullableIntVector> nullFilledVectors;
private List<ValueVector> nullFilledVectors;
private List<ColumnReader<?>> fixedLenColumnReaders = new ArrayList<>();
private final long totalNumRecordsToRead; // number of records to read

Expand Down Expand Up @@ -229,4 +228,4 @@ public void close() {
varLengthReader = null;
}
}
}
}
Loading
Loading