Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically support Spark native engine in Iceberg #9721

Closed
wants to merge 2 commits into from

Conversation

huaxingao
Copy link
Contributor

This PR is to introduce a dynamic plugin mechanism to support Spark native execution engines, e.g. Comet

Currently in Iceberg, when vectorization is activated, Iceberg employs the VectorizedReaderBuilder to generate VectorizedArrowReader and ColumnarBatchReader, which are then used for batch reading. I propose to introduce a customized VectorizedReaderBuilder and a customized ColumnarBatchReader. At runtime, if the customized VectorizedReaderBuilder and ColumnarBatchReader are accessible, the system will leverage the native vectorized execution engines. In cases where these customized components are not available, Iceberg's standard VectorizedReaderBuilder and ColumnarBatchReader will be utilized for batch reading.

A new SparkSQLProperties.CUSTOMIZED_VECTORIZATION_IMPL is added to specify the customized vectorization implementation. If CUSTOMIZED_VECTORIZATION_IMPL is not set, the default iceberg SparkVectorizedReaderBuilder and ColumnarBatchReader are used for batch reading. If VECTORIZATION_IMPL is set, the customized SparkVectorizedReaderBuilder and ColumnarBatchReaderare used for batch reading. In addition, a new SparkSQLProperties.CUSTOMIZED_VECTORIZATION_PROPERTY_PREFIX is added to specify the prefix of the customized vectorization property keys. Using Apache Comet as an example,

 SparkSession spark =
   SparkSession.builder()
       .master("local[2]")
       .config(
           SparkSQLProperties.CUSTOMIZED_VECTORIZATION_IMPL,
           "org.apache.comet.Comet")
       // CometConfig keys start with ‘spark.comet’. For example,
       // CometConf.COMET_USE_DECIMAL_128.key is ‘spark.comet.use.decimal128’
       // CometConf.COMET_USE_LAZY_MATERIALIZATION’.key is 
       // ‘spark.comet.use.lazyMaterialization’
       // so we set SparkSQLProperties.CUSTOMIZED_VECTORIZATION_PROPERTY_PREFIX to
       // ‘spark.comet’
       .config(SparkSQLProperties.CUSTOMIZED_VECTORIZATION_PROPERTY_PREFIX, "spark.comet")
       .config(CometConf.COMET_USE_DECIMAL_128().key(), "true")
       .config(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "true")
       .enableHiveSupport()
       .getOrCreate();

A VectorizedUtil class is added to dynamically load SparkVectorizedReaderBuilder and BaseColumnarBatchReader.

The customized VectorizedReaderBuilder and a customized ColumnarBatchReader need to be implemented in the native engine (e.g. Comet).

@huaxingao
Copy link
Contributor Author

@huaxingao
Copy link
Contributor Author

also cc @rdblue

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Feb 21, 2024

I am not sure I agree with the current proposal, given that it exposes lots of internal and evolving classes. Also, it may be too late to plug in another reader as Iceberg makes some assumptions on when vectorized reads can happen way earlier. This means even if the external library supports vectorized reads for nested data, we can't benefit from it because of the existing logic in SparkBatch.

Have we considered allowing to inject a custom PartitionReaderFactory? We will pass Table and a delegate partition reader factory to it. That way, external libraries will have more control over the logic but can delegate to the built-in reader factory as needed?

I am not suggesting to switch right away but rather think about this option. Will it even work? Can external libraries ship a custom partition reader factory assuming they have access to Table and built-in PartitionReaderFactory to delegate unsupported operations?

@huaxingao
Copy link
Contributor Author

@aokolnychyi Thanks a lot for your feedback! I agree that the current approach exposes a lot of the internal classes, and injecting a custom PartitionReaderFactory would indeed offer a cleaner solution. However, this might require me to duplicate additional code on Comet side. Let me explore this strategy a bit further to see how it goes.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Feb 22, 2024

@huaxingao, we can open up some utilities on Iceberg side, if needed. Unfortunately, the logic will be fairly coupled any way. I kind of hope we can offset some of the duplication by having access to the delegate PartitionReaderFactory in Comet.

It is not ideal approach too, let me know if any other ideas come up. Does Comet support vectorized reads with nested data?

@huaxingao
Copy link
Contributor Author

huaxingao commented Feb 28, 2024

@aokolnychyi

I tried the customized PartitionReaderFactory approach. I need to addCustomizedSparkColumnarReaderFactory and CustomizedBatchDataReader on Comet side, and also open a few of the iceberg classes to public. Here is the PR. Could you please take a look when you have a minute? Thanks a lot!

Comet doesn't support nested type yet.

@huaxingao
Copy link
Contributor Author

huaxingao commented Mar 5, 2024

cc @aokolnychyi @rdblue @RussellSpitzer @flyrain

Per our offline discussion with @aokolnychyi, we will not take the CustomizedSparkColumnarReaderFactory approach, instead, I have a POC PR to show how we will change iceberg code for comet integration. The PR doesn't compile because we haven't released Comet yet, but it shows the idea that how we will make the changes. Please take a look when you have time. Thanks a lot!

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Oct 18, 2024
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Oct 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants