Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] count() in avro failed when reader_types is coalescing #6131

Closed
thirtiseven opened this issue Jul 28, 2022 · 5 comments · Fixed by #6225
Closed

[BUG] count() in avro failed when reader_types is coalescing #6131

thirtiseven opened this issue Jul 28, 2022 · 5 comments · Fixed by #6225
Assignees
Labels
bug Something isn't working P0 Must have for release

Comments

@thirtiseven
Copy link
Collaborator

Describe the bug
spark.read.format("avro").load(data_path).count() reports error: QueryExecutionException: Expected 0 columns but read 8 from ArrayBuffer, if reader_types is COALESCING

Steps/Code to reproduce bug
My test code is:

rapids_reader_types = ['PERFILE', 'COALESCING', 'MULTITHREADED']

@pytest.mark.parametrize('v1_enabled_list', ["avro", ""], ids=["v1", "v2"])
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_avro_read_count(spark_tmp_path, v1_enabled_list, reader_type):
    gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)]
    data_path = spark_tmp_path + '/AVRO_DATA'
    gen_avro_files(gen_list, data_path)

    all_confs = copy_and_update(_enable_all_types_conf, {
        'spark.rapids.sql.format.avro.reader.type': reader_type,
        'spark.sql.sources.useV1SourceList': v1_enabled_list})

    count_cpu = with_cpu_session(lambda spark: spark.read.format("avro").load(data_path).count(), conf=all_confs)
    count_gpu = with_gpu_session(lambda spark: spark.read.format("avro").load(data_path).count(), conf=all_confs)

    print("count cpu" + str(count_cpu))
    print("count gpu" + str(count_gpu))

    assert(count_cpu == count_gpu)

in avro_test.py.

And it produces:

_____________________ test_avro_read_count[COALESCING-v1] ______________________

spark_tmp_path = '/tmp/pyspark_tests//dev-machine-master-95996-651248720/'
v1_enabled_list = 'avro', reader_type = 'COALESCING'

    @pytest.mark.parametrize('v1_enabled_list', ["avro", ""], ids=["v1", "v2"])
    @pytest.mark.parametrize('reader_type', rapids_reader_types)
    def test_avro_read_count(spark_tmp_path, v1_enabled_list, reader_type):
        gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)]
        data_path = spark_tmp_path + '/AVRO_DATA'
        gen_avro_files(gen_list, data_path)

        all_confs = copy_and_update(_enable_all_types_conf, {
            'spark.rapids.sql.format.avro.reader.type': reader_type,
            'spark.sql.sources.useV1SourceList': v1_enabled_list})

        count_cpu = with_cpu_session(lambda spark: spark.read.format("avro").load(data_path).count(), conf=all_confs)
>       count_gpu = with_gpu_session(lambda spark: spark.read.format("avro").load(data_path).count(), conf=all_confs)

../../src/main/python/avro_test.py:153:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../src/main/python/spark_session.py:132: in with_gpu_session
    return with_spark_session(func, conf=copy)
../../src/main/python/spark_session.py:99: in with_spark_session
    ret = func(_spark)
../../src/main/python/avro_test.py:153: in <lambda>
    count_gpu = with_gpu_session(lambda spark: spark.read.format("avro").load(data_path).count(), conf=all_confs)
/home/haoyangl/spark-3.3.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/dataframe.py:804: in count
    return int(self._jdf.count())
/home/haoyangl/spark-3.3.0-bin-hadoop3.2/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321: in __call__
    return_value = get_return_value(
/home/haoyangl/spark-3.3.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py:190: in deco
    return f(*a, **kw)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

answer = 'xro923'
gateway_client = <py4j.clientserver.JavaClient object at 0x7f608f255f70>
target_id = 'o922', name = 'count'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.

        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.

        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
>                   raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
                        format(target_id, ".", name), value)
E                   py4j.protocol.Py4JJavaError: An error occurred while calling o922.count.
E                   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 16.0 failed 1 times, most recent failure: Lost task 2.0 in stage 16.0 (TID 189) (dev-machine executor driver): org.apache.spark.sql.execution.QueryExecutionException: Expected 0 columns but read 8 from ArrayBuffer((file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00002-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2057,2038,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00021-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2056,2037,41))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00000-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2051,2032,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00004-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2048,2029,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00033-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2046,2027,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00007-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2042,2023,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00029-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2041,2022,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00035-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2041,2022,41))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00003-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2037,2018,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00026-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2037,2018,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00009-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2033,2014,41))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00049-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2029,2010,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00032-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2029,2010,40))))
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readToTable$2(GpuMultiFileReader.scala:818)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readToTable$2$adapted(GpuMultiFileReader.scala:815)
E                       at com.nvidia.spark.rapids.Arm.closeOnExcept(Arm.scala:87)
E                       at com.nvidia.spark.rapids.Arm.closeOnExcept$(Arm.scala:85)
E                       at com.nvidia.spark.rapids.FilePartitionReaderBase.closeOnExcept(GpuMultiFileReader.scala:254)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readToTable$1(GpuMultiFileReader.scala:815)
E                       at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
E                       at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
E                       at com.nvidia.spark.rapids.FilePartitionReaderBase.withResource(GpuMultiFileReader.scala:254)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.readToTable(GpuMultiFileReader.scala:810)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readBatch$1(GpuMultiFileReader.scala:784)
E                       at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
E                       at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
E                       at com.nvidia.spark.rapids.FilePartitionReaderBase.withResource(GpuMultiFileReader.scala:254)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.readBatch(GpuMultiFileReader.scala:766)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.next(GpuMultiFileReader.scala:752)
E                       at com.nvidia.spark.rapids.PartitionIterator.hasNext(dataSourceUtil.scala:29)
E                       at com.nvidia.spark.rapids.MetricsBatchIterator.hasNext(dataSourceUtil.scala:46)
E                       at com.nvidia.spark.rapids.shims.GpuDataSourceRDD$$anon$1.$anonfun$hasNext$1(GpuDataSourceRDD.scala:53)
E                       at com.nvidia.spark.rapids.shims.GpuDataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(GpuDataSourceRDD.scala:53)
E                       at scala.Option.exists(Option.scala:376)
E                       at com.nvidia.spark.rapids.shims.GpuDataSourceRDD$$anon$1.hasNext(GpuDataSourceRDD.scala:53)
E                       at com.nvidia.spark.rapids.shims.GpuDataSourceRDD$$anon$1.advanceToNextIter(GpuDataSourceRDD.scala:78)
E                       at com.nvidia.spark.rapids.shims.GpuDataSourceRDD$$anon$1.hasNext(GpuDataSourceRDD.scala:53)
E                       at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
E                       at org.apache.spark.sql.rapids.GpuFileSourceScanExec$$anon$1.hasNext(GpuFileSourceScanExec.scala:420)
E                       at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$next$2(aggregate.scala:237)
E                       at scala.Option.getOrElse(Option.scala:189)
E                       at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:235)
E                       at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
E                       at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.partNextBatch(GpuShuffleExchangeExecBase.scala:288)
E                       at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.hasNext(GpuShuffleExchangeExecBase.scala:304)
E                       at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
E                       at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
E                       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
E                       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
E                       at org.apache.spark.scheduler.Task.run(Task.scala:136)
E                       at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
E                       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
E                       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
E                       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
E                       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
E                       at java.lang.Thread.run(Thread.java:748)
E
E                   Driver stacktrace:
E                       at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
E                       at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
E                       at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
E                       at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
E                       at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
E                       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
E                       at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
E                       at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
E                       at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
E                       at scala.Option.foreach(Option.scala:407)
E                       at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
E                       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
E                       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
E                       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
E                       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
E                       at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
E                       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
E                       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
E                       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
E                       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
E                       at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
E                       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E                       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E                       at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
E                       at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
E                       at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
E                       at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3161)
E                       at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3160)
E                       at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
E                       at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
E                       at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
E                       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
E                       at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
E                       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
E                       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
E                       at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
E                       at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
E                       at org.apache.spark.sql.Dataset.count(Dataset.scala:3160)
E                       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E                       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E                       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E                       at java.lang.reflect.Method.invoke(Method.java:498)
E                       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E                       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E                       at py4j.Gateway.invoke(Gateway.java:282)
E                       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E                       at py4j.commands.CallCommand.execute(CallCommand.java:79)
E                       at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
E                       at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
E                       at java.lang.Thread.run(Thread.java:748)
E                   Caused by: org.apache.spark.sql.execution.QueryExecutionException: Expected 0 columns but read 8 from ArrayBuffer((file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00002-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2057,2038,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00021-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2056,2037,41))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00000-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2051,2032,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00004-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2048,2029,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00033-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2046,2027,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00007-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2042,2023,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00029-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2041,2022,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00035-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2041,2022,41))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00003-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2037,2018,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00026-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2037,2018,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00009-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2033,2014,41))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00049-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2029,2010,40))), (file:/tmp/pyspark_tests/dev-machine-master-95996-651248720/AVRO_DATA/part-00032-421fbae4-36ea-4f2c-a721-91ecfddb4929-c000.avro,AvroDataBlock(BlockInfo(446,2029,2010,40))))
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readToTable$2(GpuMultiFileReader.scala:818)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readToTable$2$adapted(GpuMultiFileReader.scala:815)
E                       at com.nvidia.spark.rapids.Arm.closeOnExcept(Arm.scala:87)
E                       at com.nvidia.spark.rapids.Arm.closeOnExcept$(Arm.scala:85)
E                       at com.nvidia.spark.rapids.FilePartitionReaderBase.closeOnExcept(GpuMultiFileReader.scala:254)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readToTable$1(GpuMultiFileReader.scala:815)
E                       at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
E                       at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
E                       at com.nvidia.spark.rapids.FilePartitionReaderBase.withResource(GpuMultiFileReader.scala:254)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.readToTable(GpuMultiFileReader.scala:810)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readBatch$1(GpuMultiFileReader.scala:784)
E                       at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
E                       at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
E                       at com.nvidia.spark.rapids.FilePartitionReaderBase.withResource(GpuMultiFileReader.scala:254)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.readBatch(GpuMultiFileReader.scala:766)
E                       at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.next(GpuMultiFileReader.scala:752)
E                       at com.nvidia.spark.rapids.PartitionIterator.hasNext(dataSourceUtil.scala:29)
E                       at com.nvidia.spark.rapids.MetricsBatchIterator.hasNext(dataSourceUtil.scala:46)
E                       at com.nvidia.spark.rapids.shims.GpuDataSourceRDD$$anon$1.$anonfun$hasNext$1(GpuDataSourceRDD.scala:53)
E                       at com.nvidia.spark.rapids.shims.GpuDataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(GpuDataSourceRDD.scala:53)
E                       at scala.Option.exists(Option.scala:376)
E                       at com.nvidia.spark.rapids.shims.GpuDataSourceRDD$$anon$1.hasNext(GpuDataSourceRDD.scala:53)
E                       at com.nvidia.spark.rapids.shims.GpuDataSourceRDD$$anon$1.advanceToNextIter(GpuDataSourceRDD.scala:78)
E                       at com.nvidia.spark.rapids.shims.GpuDataSourceRDD$$anon$1.hasNext(GpuDataSourceRDD.scala:53)
E                       at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
E                       at org.apache.spark.sql.rapids.GpuFileSourceScanExec$$anon$1.hasNext(GpuFileSourceScanExec.scala:420)
E                       at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$next$2(aggregate.scala:237)
E                       at scala.Option.getOrElse(Option.scala:189)
E                       at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:235)
E                       at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
E                       at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.partNextBatch(GpuShuffleExchangeExecBase.scala:288)
E                       at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.hasNext(GpuShuffleExchangeExecBase.scala:304)
E                       at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
E                       at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
E                       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
E                       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
E                       at org.apache.spark.scheduler.Task.run(Task.scala:136)
E                       at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
E                       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
E                       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
E                       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
E                       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
E                       ... 1 more

/home/haoyangl/spark-3.3.0-bin-hadoop3.2/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326: Py4JJavaError

Expected behavior
spark.read.format("avro").load(data_path).count() should return the row number of the avro file, and same as the result in CPU version.
We should add a test for .count() in avro test when it has been fixed.

@thirtiseven thirtiseven added bug Something isn't working ? - Needs Triage Need team to review and classify labels Jul 28, 2022
@res-life
Copy link
Collaborator

One more issue: leak occurred when reader_types is MULTITHREADED and version is v2

../../src/main/python/avro_test.py::test_avro_read_count[MULTITHREADED-v2] 22/07/29 03:41:55 ERROR HostMemoryBuffer: A HOST BUFFER WAS LEAKED (ID: 27 7f58d80239c0)
22/07/29 03:41:55 ERROR HostMemoryBuffer: A HOST BUFFER WAS LEAKED (ID: 32 7f58e80101b0)
22/07/29 03:41:55 ERROR HostMemoryBuffer: A HOST BUFFER WAS LEAKED (ID: 49 7f590c013820)
22/07/29 03:41:55 ERROR HostMemoryBuffer: A HOST BUFFER WAS LEAKED (ID: 40 7f58d8024ed0)
22/07/29 03:41:55 ERROR HostMemoryBuffer: A HOST BUFFER WAS LEAKED (ID: 45 7f58d4015670)
......
PASSED

@firestarman
Copy link
Collaborator

firestarman commented Aug 1, 2022

This is caused by probably the behavior of cudf.Table.readAvro.
When specifying no column names to readAvro, it will read all the columns in the file. So the log complained "want 0 columns but got 8".

One fix is to add check for empty read schema. if it is empty, return an empty batch with correct row number, instead of calling into
readAvro. It is something like https://github.com/NVIDIA/spark-rapids/blob/branch-22.08/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala#L773

Better to check Parquet and ORC if they have the same issue.

@firestarman
Copy link
Collaborator

firestarman commented Aug 1, 2022

Avro always specifies the schema from a file here, which is not correct.
We should build a schema from readDataSchema and fileSchema, the same with ORC and Parquet.

@sameerz sameerz added P0 Must have for release and removed ? - Needs Triage Need team to review and classify labels Aug 2, 2022
@sameerz
Copy link
Collaborator

sameerz commented Aug 2, 2022

Related issue to add tests: #717

@thirtiseven thirtiseven self-assigned this Aug 3, 2022
@firestarman
Copy link
Collaborator

firestarman commented Aug 4, 2022

One more issue: leak occurred when reader_types is MULTITHREADED and version is v2

../../src/main/python/avro_test.py::test_avro_read_count[MULTITHREADED-v2] 22/07/29 03:41:55 ERROR HostMemoryBuffer: A HOST BUFFER WAS LEAKED (ID: 27 7f58d80239c0)
22/07/29 03:41:55 ERROR HostMemoryBuffer: A HOST BUFFER WAS LEAKED (ID: 32 7f58e80101b0)
22/07/29 03:41:55 ERROR HostMemoryBuffer: A HOST BUFFER WAS LEAKED (ID: 49 7f590c013820)
22/07/29 03:41:55 ERROR HostMemoryBuffer: A HOST BUFFER WAS LEAKED (ID: 40 7f58d8024ed0)
22/07/29 03:41:55 ERROR HostMemoryBuffer: A HOST BUFFER WAS LEAKED (ID: 45 7f58d4015670)
......
PASSED

@res-life Could you file an issue for this ? Here is the issue #6220
This is really a bug, #6219 could be one fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants