Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg/Comet integration POC #9841

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .baseline/checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@

<!-- Referencing guava classes should be allowed in classes within bundled-guava module -->
<suppress files="org.apache.iceberg.GuavaClasses" id="BanUnrelocatedGuavaClasses"/>

<!-- Suppress checks for CometColumnReader -->
<suppress files="org.apache.iceberg.spark.data.vectorized.CometColumnReader" checks="IllegalImport"/>
</suppressions>
5 changes: 5 additions & 0 deletions spark/v3.4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
annotationProcessor libs.immutables.value
compileOnly libs.immutables.value
implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation project(':iceberg-data')
Expand All @@ -77,6 +79,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'org.roaringbitmap'
}

compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.3.0"

implementation libs.parquet.column
implementation libs.parquet.hadoop

Expand Down Expand Up @@ -178,6 +182,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation libs.avro.avro
testImplementation libs.parquet.hadoop
testImplementation libs.junit.vintage.engine
testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.3.0"

// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly libs.antlr.runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class SmokeTest extends SparkExtensionsTestBase {
Expand All @@ -44,7 +45,7 @@ public void dropTable() {
// Run through our Doc's Getting Started Example
// TODO Update doc example so that it can actually be run, modifications were required for this
// test suite to run
@Test
@Ignore
public void testGettingStarted() throws IOException {
// Creating a table
sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import java.io.Serializable;
import org.immutables.value.Value;

@Value.Immutable
public interface OrcBatchReadConf extends Serializable {
int batchSize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import java.io.Serializable;
import org.immutables.value.Value;

@Value.Immutable
public interface ParquetBatchReadConf extends Serializable {
int batchSize();

ParquetReaderType readerType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

public enum ParquetReaderType {
huaxingao marked this conversation as resolved.
Show resolved Hide resolved
ICEBERG,
COMET
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,12 @@ public boolean reportColumnStats() {
.defaultValue(SparkSQLProperties.REPORT_COLUMN_STATS_DEFAULT)
.parse();
}

public ParquetReaderType parquetReaderType() {
return confParser
.enumConf(ParquetReaderType::valueOf)
.sessionConf(SparkSQLProperties.PARQUET_READER_TYPE)
.defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ private SparkSQLProperties() {}
// Controls whether vectorized reads are enabled
public static final String VECTORIZATION_ENABLED = "spark.sql.iceberg.vectorization.enabled";

// Controls which Parquet reader to use for vectorization
public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type";
public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.COMET;

// Controls whether reading/writing timestamps without timezones is allowed
@Deprecated
public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.data.vectorized;

import java.util.Iterator;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

@SuppressWarnings("checkstyle:VisibilityModifier")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes would require a bit more time to review. I'll do that tomorrow. I think we would want to restructure the original implementation a bit. Not a concern for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would want to structure this a bit differently. Let me think more.

public abstract class BaseColumnBatchLoader {
protected final int numRowsToRead;
// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when
// there is no deletes
protected int[] rowIdMapping;
// the array to indicate if a row is deleted or not, it is null when there is no "_deleted"
// metadata column
protected boolean[] isDeleted;
private final boolean hasIsDeletedColumn;
private final DeleteFilter<InternalRow> deletes;
private final long rowStartPosInBatch;

protected BaseColumnBatchLoader(
int numRowsToRead,
boolean hasIsDeletedColumn,
DeleteFilter<InternalRow> deletes,
long rowStartPosInBatch) {
Preconditions.checkArgument(
numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);
this.numRowsToRead = numRowsToRead;
this.hasIsDeletedColumn = hasIsDeletedColumn;
this.deletes = deletes;
this.rowStartPosInBatch = rowStartPosInBatch;
if (hasIsDeletedColumn) {
isDeleted = new boolean[numRowsToRead];
}
}

protected ColumnarBatch initializeColumnBatchWithDeletions(
ColumnVector[] arrowColumnVectors, int numRowsUndeleted) {
ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);
newColumnarBatch.setNumRows(numRowsUndeleted);

if (hasEqDeletes()) {
applyEqDelete(newColumnarBatch);
}

if (hasIsDeletedColumn && rowIdMapping != null) {
// reset the row id mapping array, so that it doesn't filter out the deleted rows
for (int i = 0; i < numRowsToRead; i++) {
rowIdMapping[i] = i;
}
newColumnarBatch.setNumRows(numRowsToRead);
}
return newColumnarBatch;
}

/**
* This method iterates over each column reader and reads the current batch of data into the
* {@link ColumnVector}.
*/
protected abstract ColumnVector[] readDataToColumnVectors();

/**
* This method reads the current batch of data into the {@link ColumnVector}, and applies deletion
* logic, and loads data into a {@link ColumnarBatch}.
*/
public abstract ColumnarBatch loadDataToColumnBatch();

boolean hasEqDeletes() {
return deletes != null && deletes.hasEqDeletes();
}

protected int initRowIdMapping() {
Pair<int[], Integer> posDeleteRowIdMapping = posDelRowIdMapping();
if (posDeleteRowIdMapping != null) {
rowIdMapping = posDeleteRowIdMapping.first();
return posDeleteRowIdMapping.second();
} else {
rowIdMapping = initEqDeleteRowIdMapping();
return numRowsToRead;
}
}

Pair<int[], Integer> posDelRowIdMapping() {
if (deletes != null && deletes.hasPosDeletes()) {
return buildPosDelRowIdMapping(deletes.deletedRowPositions());
} else {
return null;
}
}

/**
* Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we
* delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the row
* id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position delete 2,
* 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6] [F,F,T,F,F,F,T,F]
* -- After applying position deletes
*
* @param deletedRowPositions a set of deleted row positions
* @return the mapping array and the new num of rows in a batch, null if no row is deleted
*/
Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) {
if (deletedRowPositions == null) {
return null;
}

int[] posDelRowIdMapping = new int[numRowsToRead];
int originalRowId = 0;
int currentRowId = 0;
while (originalRowId < numRowsToRead) {
if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) {
posDelRowIdMapping[currentRowId] = originalRowId;
currentRowId++;
} else {
if (hasIsDeletedColumn) {
isDeleted[originalRowId] = true;
}

deletes.incrementDeleteCount();
}
originalRowId++;
}

if (currentRowId == numRowsToRead) {
// there is no delete in this batch
return null;
} else {
return Pair.of(posDelRowIdMapping, currentRowId);
}
}

int[] initEqDeleteRowIdMapping() {
int[] eqDeleteRowIdMapping = null;
if (hasEqDeletes()) {
eqDeleteRowIdMapping = new int[numRowsToRead];
for (int i = 0; i < numRowsToRead; i++) {
eqDeleteRowIdMapping[i] = i;
}
}

return eqDeleteRowIdMapping;
}

/**
* Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original status
* of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array
* Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to
* 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <= 3
* [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4] [F,T,T,T,F,F,T,F]
* -- After applying equality deletes
*
* @param columnarBatch the {@link ColumnarBatch} to apply the equality delete
*/
void applyEqDelete(ColumnarBatch columnarBatch) {
Iterator<InternalRow> it = columnarBatch.rowIterator();
int rowId = 0;
int currentRowId = 0;
while (it.hasNext()) {
InternalRow row = it.next();
if (deletes.eqDeletedRowFilter().test(row)) {
// the row is NOT deleted
// skip deleted rows by pointing to the next undeleted row Id
rowIdMapping[currentRowId] = rowIdMapping[rowId];
currentRowId++;
} else {
if (hasIsDeletedColumn) {
isDeleted[rowIdMapping[rowId]] = true;
}

deletes.incrementDeleteCount();
}

rowId++;
}

columnarBatch.setNumRows(currentRowId);
}
}
Loading
Loading