Skip to content

Commit

Permalink
[delta-standalone] Add check that schema contains partition columns (d…
Browse files Browse the repository at this point in the history
…elta-io#353)

* add partition cols check to OptTxn; new tests pass; 68 old existing standaloen tests fail

* num failing tests reduced from 68 to 59

* num failing tests reduced from 59 to 48

* num failing tests reduced from 48 to 31

* num failing tests reduced from 31 to 25

* all delta standalone tests pass

* fix compatibility tests

* Fix Sink test after regenerating test/resources partitioned table (delta-io#46)

* KC_fix_schema_contains_partition_cols_check - Fix Sink test after regenerating test/resource partitioned table to one that includes partition columns in Delta Schema

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* KC_fix_schema_contains_partition_cols_check - code cleanup.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* Fix compilation issue

* Update missing partitions cols algorithm and error message to show all missing cols

* respond to PR feedback

* fix typo

* update test

* Add comments to make test case more clear

Co-authored-by: kristoffSC <krzysiek.chmielewski@gmail.com>
  • Loading branch information
scottsand-db and kristoffSC authored Jun 13, 2022
1 parent 9a5459d commit 42e0238
Show file tree
Hide file tree
Showing 25 changed files with 256 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void runDeltaSinkTest() throws Exception {
List<AddFile> initialDeltaFiles = deltaLog.snapshot().getAllFiles();
int initialTableRecordsCount = TestParquetReader.readAndValidateAllTableRecords(deltaLog);
long initialVersion = deltaLog.snapshot().getVersion();
assertEquals(initialDeltaFiles.size(), 2);
assertEquals(2, initialDeltaFiles.size());

JobGraph jobGraph = createJobGraph(deltaTablePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ public void runDeltaSinkTest() throws Exception {
// GIVEN
DeltaLog deltaLog = DeltaLog.forTable(DeltaSinkTestUtils.getHadoopConf(), deltaTablePath);
List<AddFile> initialDeltaFiles = deltaLog.snapshot().getAllFiles();

long initialVersion = deltaLog.snapshot().getVersion();
int initialTableRecordsCount = TestParquetReader.readAndValidateAllTableRecords(deltaLog);
assertEquals(initialDeltaFiles.size(), 2);
assertEquals(2, initialTableRecordsCount);

JobGraph jobGraph = createJobGraph(deltaTablePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testCommitTwice() throws Exception {
DeltaSinkTestUtils.getListOfDeltaGlobalCommittables(
numAddedFiles, DeltaSinkTestUtils.getTestPartitionSpec());
DeltaGlobalCommitter globalCommitter =
getTestGlobalCommitter(DeltaSinkTestUtils.TEST_ROW_TYPE);
getTestGlobalCommitter(DeltaSinkTestUtils.TEST_PARTITIONED_ROW_TYPE);

// WHEN
globalCommitter.commit(globalCommittables);
Expand Down Expand Up @@ -135,7 +135,7 @@ public void testMergeSchemaSetToTrue() throws IOException {

// add new field to the schema
RowType updatedSchema =
DeltaSinkTestUtils.addNewColumnToSchema(DeltaSinkTestUtils.TEST_ROW_TYPE);
DeltaSinkTestUtils.addNewColumnToSchema(DeltaSinkTestUtils.TEST_PARTITIONED_ROW_TYPE);

DeltaGlobalCommitter globalCommitter = new DeltaGlobalCommitter(
DeltaSinkTestUtils.getHadoopConf(),
Expand All @@ -151,7 +151,7 @@ public void testMergeSchemaSetToTrue() throws IOException {
// schema before deltaLog.update() is in old format, but after update it equals to the new
// format
assertEquals(deltaLog.snapshot().getMetadata().getSchema().toJson(),
SchemaConverter.toDeltaDataType(DeltaSinkTestUtils.TEST_ROW_TYPE).toJson());
SchemaConverter.toDeltaDataType(DeltaSinkTestUtils.TEST_PARTITIONED_ROW_TYPE).toJson());
deltaLog.update();
assertEquals(deltaLog.snapshot().getMetadata().getSchema().toJson(),
SchemaConverter.toDeltaDataType(updatedSchema).toJson());
Expand Down Expand Up @@ -328,7 +328,7 @@ public void testCommittablesFromDifferentCheckpointIntervalOneWithIncompatiblePa
);

DeltaGlobalCommitter globalCommitter =
getTestGlobalCommitter(DeltaSinkTestUtils.TEST_ROW_TYPE);
getTestGlobalCommitter(DeltaSinkTestUtils.TEST_PARTITIONED_ROW_TYPE);

// WHEN
try {
Expand Down Expand Up @@ -425,7 +425,7 @@ public void testUseFullPathForDeltaLog() throws Exception {
DeltaGlobalCommitter globalCommitter = new DeltaGlobalCommitter(
hadoopConfig,
tablePath,
DeltaSinkTestUtils.TEST_ROW_TYPE,
DeltaSinkTestUtils.TEST_PARTITIONED_ROW_TYPE,
false // mergeSchema
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ public void setup() throws IOException {
}
}
deltaLog = DeltaLog.forTable(DeltaSinkTestUtils.getHadoopConf(), tablePath.getPath());
RowType rowType = (partitionSpec.isEmpty()) ?
DeltaSinkTestUtils.TEST_ROW_TYPE : DeltaSinkTestUtils.TEST_PARTITIONED_ROW_TYPE;

rowTypeToCommit = mergeSchema ?
DeltaSinkTestUtils.addNewColumnToSchema(DeltaSinkTestUtils.TEST_ROW_TYPE) :
DeltaSinkTestUtils.TEST_ROW_TYPE;
DeltaSinkTestUtils.addNewColumnToSchema(rowType) : rowType;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class DeltaWriterBucketTest {
private static final String BUCKET_ID = "testing-bucket";
private static final String APP_ID = "1";

private Map<String, Counter> testCounters = new HashMap<>();
private final Map<String, Counter> testCounters = new HashMap<>();

@Test
public void testOnCheckpointNoPendingRecoverable() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,24 @@ public class DeltaSinkTestUtils {
new RowType.RowField("age", new IntType())
));

public static final RowType TEST_PARTITIONED_ROW_TYPE = new RowType(Arrays.asList(
new RowType.RowField("name", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("surname", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("age", new IntType()),
new RowType.RowField("col1", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("col2", new VarCharType(VarCharType.MAX_LENGTH))
));

public static final DataFormatConverters.DataFormatConverter<RowData, Row> CONVERTER =
DataFormatConverters.getConverterForDataType(
TypeConversions.fromLogicalToDataType(TEST_ROW_TYPE)
);

public static final DataFormatConverters.DataFormatConverter<RowData, Row>
PARTITIONED_CONVERTER = DataFormatConverters.getConverterForDataType(
TypeConversions.fromLogicalToDataType(TEST_PARTITIONED_ROW_TYPE)
);

public static List<RowData> getTestRowData(int num_records) {
List<RowData> rows = new ArrayList<>(num_records);
for (int i = 0; i < num_records; i++) {
Expand Down Expand Up @@ -384,7 +397,7 @@ public static DeltaSinkInternal<RowData> createDeltaSink(String deltaTablePath,
),
new BasePathBucketAssigner<>(),
OnCheckpointRollingPolicy.build(),
DeltaSinkTestUtils.TEST_ROW_TYPE,
DeltaSinkTestUtils.TEST_PARTITIONED_ROW_TYPE,
false // mergeSchema
);
return builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader;
import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;

import io.delta.standalone.DeltaLog;
import io.delta.standalone.actions.AddFile;
Expand Down Expand Up @@ -72,15 +75,28 @@ public static int readAndValidateAllTableRecords(DeltaLog deltaLog) throws IOExc
* @throws IOException Thrown if an error occurs while reading the file
*/
public static int parseAndCountRecords(Path parquetFilepath,
RowType rowType) throws IOException {
RowType rowType) throws IOException {

ParquetColumnarRowSplitReader reader = getTestParquetReader(
parquetFilepath,
rowType
);

DataFormatConverters.DataFormatConverter<RowData, Row> converter;
if (DeltaSinkTestUtils.TEST_ROW_TYPE.equals(rowType)) {
converter = DeltaSinkTestUtils.CONVERTER;
} else if (DeltaSinkTestUtils.TEST_PARTITIONED_ROW_TYPE.equals(rowType)) {
converter = DeltaSinkTestUtils.PARTITIONED_CONVERTER;
} else {
throw new RuntimeException(
"Unable to find DataFormatConverters for used RowType. Probably new "
+ "implementation is needed"
);
}

int recordsRead = 0;
while (!reader.reachedEnd()) {
DeltaSinkTestUtils.CONVERTER.toExternal(reader.nextRecord());
converter.toExternal(reader.nextRecord());
recordsRead++;
}
return recordsRead;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{"commitInfo":{"timestamp":1635285930717,"operation":"STREAMING UPDATE","isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numOutputRows":"2","numAddedFiles":"2","numOutputBytes":"1726","numRemovedFiles":"0"},"engineInfo":"flink.1.12 Delta Standalone/0.2.1-SNAPSHOT"}}
{"commitInfo":{"timestamp":1653946642277,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"col1\",\"col2\"]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"873","numOutputRows":"2"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"14651129-76c2-48eb-86a0-f52f88b5aab2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"surname\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["col1","col2"],"configuration":{},"createdTime":1632772157770}}
{"add":{"path":"col1=val1/col2=val2/part-00000-0561acbc-6d81-43b5-8683-6d442547151e-c000.snappy.parquet","partitionValues":{"col1":"val1","col2":"val2"},"size":882,"modificationTime":1632772158982,"dataChange":true}}
{"add":{"path":"col1=val1/col2=val2/part-00001-e6758bba-2d53-49f7-8bc2-b8e24d0e30e5-c000.snappy.parquet","partitionValues":{"col1":"val1","col2":"val2"},"size":844,"modificationTime":1632772158982,"dataChange":true}}
{"metaData":{"id":"e5ec0a0b-186b-4af8-8b64-f5eb808981e1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"surname\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["col1","col2"],"configuration":{},"createdTime":1653946639728}}
{"add":{"path":"col1=val1/col2=val2/part-00000-bda8015b-74e7-4a29-afe7-9d6c77f3a7ae.c000.snappy.parquet","partitionValues":{"col1":"val1","col2":"val2"},"size":873,"modificationTime":1653946642197,"dataChange":true}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ class OSSCompatibilitySuite extends OssCompatibilitySuiteBase with ComparisonUti
checkStandalone(
"taint whole table + concurrent remove",
conflicts = true,
setup = Seq(ss.conflict.metadata_colX, ss.conflict.addA),
setup = Seq(ss.conflict.metadata_colXY, ss.conflict.addA),
reads = Seq(
// `readWholeTable` should disallow any concurrent `RemoveFile`s.
t => t.readWholeTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ trait OssCompatibilitySuiteBase extends QueryTest with SharedSparkSession {
testName: String,
conflicts: Boolean,
setup: Seq[StandaloneActions.Action] =
Seq(StandaloneActions.Metadata.builder().build(), new StandaloneActions.Protocol(1, 2)),
Seq(ss.conflict.metadata_colXY, new StandaloneActions.Protocol(1, 2)),
reads: Seq[StandaloneOptTxn => Unit],
concurrentOSSWrites: Seq[OSSActions.Action],
actions: Seq[StandaloneActions.Action],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class OSSUtil(now: Long) {

val schema: StructType = StructType(Array(
StructField("col1_part", IntegerType, nullable = true),
StructField("col2_part", StringType, nullable = true)
StructField("col2_part", StringType, nullable = true),
StructField("foo", StringType, nullable = true)
))

private val partitionColumns = schema.fieldNames.filter(_.contains("part")).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class StandaloneUtil(now: Long) {

val schema = new StructType(Array(
new StructField("col1_part", new IntegerType(), true),
new StructField("col2_part", new StringType(), true)
new StructField("col2_part", new StringType(), true),
new StructField("foo", new StringType(), true)
))

val partitionColumns: Seq[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ private[internal] class OptimisticTransactionImpl(

/**
* All [[Metadata]] actions must go through this function, and be added to the committed actions
* via `newMetadata` (they shouldn't ever be passed into `prepareCommit`.)
* via `newMetadata`. That is, they should never be passed into `prepareCommit`.
*
* This function enforces:
* - At most one unique [[Metadata]] is committed in a single transaction.
* - If this is the first commit, the committed metadata configuration includes global Delta
Expand Down Expand Up @@ -206,6 +207,7 @@ private[internal] class OptimisticTransactionImpl(
}

verifyNewMetadata(latestMetadata)
checkPartitionColumns(latestMetadata.partitionColumns, latestMetadata.schema)

logInfo(s"Updated metadata from ${newMetadata.getOrElse("-")} to $latestMetadata")

Expand All @@ -228,6 +230,10 @@ private[internal] class OptimisticTransactionImpl(

/**
* Prepare for a commit by doing all necessary pre-commit checks and modifications to the actions.
*
* Requires that no Metadata action exists inside of `actions`. Instead, Metadata actions should
* be added via the `newMetadata` field.
*
* @return The finalized set of actions.
*/
private def prepareCommit(actions: Seq[Action]): Seq[Action] = {
Expand Down Expand Up @@ -461,6 +467,25 @@ private[internal] class OptimisticTransactionImpl(
Protocol.checkMetadataProtocolProperties(metadata, protocol)
}

/**
* Check that the schema contains all partition columns and at least one non-partition column
*/
private def checkPartitionColumns(partitionCols: Seq[String], schema: StructType): Unit = {
// schema contains all partition column
val schemaCols = schema.getFieldNames.toSet

val partitionsColsNotInSchema = partitionCols.toSet.diff(schemaCols).toSeq

if (partitionsColsNotInSchema.nonEmpty) {
throw DeltaErrors.partitionColumnsNotFoundException(partitionsColsNotInSchema, schema)
}

// schema contains at least one non-partition column
if (partitionCols.length == schemaCols.size) {
throw DeltaErrors.nonPartitionColumnAbsentException()
}
}

/**
* We want to check that the [[newSchema]] is compatible with the [[existingSchema]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,16 @@ private[internal] object DeltaErrors {
" cannot be set at the same time. Please set only one group of them.")
}

def partitionColumnsNotFoundException(partCols: Seq[String], schema: StructType): Throwable = {
new DeltaStandaloneException(s"Partition column(s) ${partCols.mkString(",")} not found in " +
s"schema:\n${schema.getTreeString}")
}

def nonPartitionColumnAbsentException(): Throwable = {
new DeltaStandaloneException("Data written into Delta needs to contain at least one " +
"non-partitioned column")
}

///////////////////////////////////////////////////////////////////////////
// Helper Methods
///////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 42e0238

Please sign in to comment.