-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Porting GpuRowToColumnar converters to InternalColumnarRDDConverter #4206
Porting GpuRowToColumnar converters to InternalColumnarRDDConverter #4206
Conversation
- allow converting array, map, struct and decimal Signed-off-by: Allen Xu <allxu@nvidia.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to add tests for this. To what extent has it been tested already?
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Outdated
Show resolved
Hide resolved
...ugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala
Outdated
Show resolved
Hide resolved
...ugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala
Outdated
Show resolved
Hide resolved
...ugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala
Outdated
Show resolved
Hide resolved
...ugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You only need to enable the nested types support for your case here, instead of updating the API definition for all cases.
def convert(df: DataFrame): RDD[Table] = {
val schema = df.schema
- if (!GpuOverrides.areAllSupportedTypes(schema.map(_.dataType) :_*)) {
- val unsupported = schema.map(_.dataType).filter(!GpuOverrides.isSupportedType(_)).toSet
+ val unsupported = schema.map(_.dataType).filter( d =>
+ !GpuOverrides.isSupportedType(d, allowArray=true, allowNesting=true, ....)
+ ).toSet
+ if (unsupported.nonEmpty) {
throw new IllegalArgumentException(s"Cannot convert $df to GPU columnar $unsupported are " +
s"not currently supported data types for columnar.")
}
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
...src/test/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRDDConverterSuite.scala
Show resolved
Hide resolved
The problem is caused by a bug in cudf where |
Signed-off-by: Allen Xu <allxu@nvidia.com>
build |
...ugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala
Outdated
Show resolved
Hide resolved
...ugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala
Outdated
Show resolved
Hide resolved
...ugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala
Outdated
Show resolved
Hide resolved
...ugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala
Outdated
Show resolved
Hide resolved
...ugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala
Outdated
Show resolved
Hide resolved
…n/InternalColumnarRddConverter.scala Co-authored-by: Jason Lowe <jlowe@nvidia.com>
build |
Signed-off-by: Allen Xu <allxu@nvidia.com>
build |
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jlowe , not in the unit tests, but in the performance tests for the PCA training, I saw several ERROR ColumnVector: A DEVICE COLUMN VECTOR WAS LEAKED
.
After enabling the ai.rapids.refcount.debug
, I can see the detailed logs:
21/12/06 16:54:40 ERROR ColumnVector: A DEVICE COLUMN VECTOR WAS LEAKED (ID: 503900 7f3d5c8a8730)
21/12/06 16:54:40 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/12/06 16:54:40 ERROR MemoryCleaner: Leaked vector (ID: 503900): 2021-12-06 16:54:08.0841 CST: INC
java.lang.Thread.getStackTrace(Thread.java:1556)
ai.rapids.cudf.MemoryCleaner$RefCountDebugItem.<init>(MemoryCleaner.java:301)
ai.rapids.cudf.MemoryCleaner$Cleaner.addRef(MemoryCleaner.java:82)
ai.rapids.cudf.ColumnVector.incRefCountInternal(ColumnVector.java:245)
ai.rapids.cudf.ColumnVector.<init>(ColumnVector.java:134)
ai.rapids.cudf.ColumnView$NestedColumnVector.createColumnVector(ColumnView.java:3835)
ai.rapids.cudf.HostColumnVector.copyToDevice(HostColumnVector.java:220)
ai.rapids.cudf.HostColumnVector$ColumnBuilder.buildAndPutOnDevice(HostColumnVector.java:1290)
com.nvidia.spark.rapids.GpuColumnVector$GpuColumnarBatchBuilder.buildAndPutOnDevice(GpuColumnVector.java:402)
com.nvidia.spark.rapids.GpuColumnVector$GpuColumnarBatchBuilderBase.build(GpuColumnVector.java:277)
org.apache.spark.sql.rapids.execution.ExternalRowToColumnarIterator.buildBatch(InternalColumnarRddConverter.scala:614)
org.apache.spark.sql.rapids.execution.ExternalRowToColumnarIterator.next(InternalColumnarRddConverter.scala:578)
org.apache.spark.sql.rapids.execution.ExternalRowToColumnarIterator.next(InternalColumnarRddConverter.scala:561)
scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
...
2021-12-06 16:54:08.0841 CST: INC
java.lang.Thread.getStackTrace(Thread.java:1556)
ai.rapids.cudf.MemoryCleaner$RefCountDebugItem.<init>(MemoryCleaner.java:301)
ai.rapids.cudf.MemoryCleaner$Cleaner.addRef(MemoryCleaner.java:82)
ai.rapids.cudf.ColumnVector.incRefCountInternal(ColumnVector.java:245)
ai.rapids.cudf.ColumnVector.incRefCount(ColumnVector.java:241)
ai.rapids.cudf.Table.<init>(Table.java:71)
com.nvidia.spark.rapids.GpuColumnVector.from(GpuColumnVector.java:610)
org.apache.spark.sql.rapids.execution.InternalColumnarRddConverter$.$anonfun$convert$10(InternalColumnarRddConverter.scala:724)
scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
...
2021-12-06 16:54:08.0841 CST: DEC
java.lang.Thread.getStackTrace(Thread.java:1556)
ai.rapids.cudf.MemoryCleaner$RefCountDebugItem.<init>(MemoryCleaner.java:301)
ai.rapids.cudf.MemoryCleaner$Cleaner.delRef(MemoryCleaner.java:90)
ai.rapids.cudf.ColumnVector.close(ColumnVector.java:213)
com.nvidia.spark.rapids.GpuColumnVector.close(GpuColumnVector.java:1045)
org.apache.spark.sql.vectorized.ColumnarBatch.close(ColumnarBatch.java:48)
org.apache.spark.sql.rapids.execution.InternalColumnarRddConverter$.$anonfun$convert$10(InternalColumnarRddConverter.scala:726)
scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
...
The pattern is like "2 INC 1 DEC". But checking the code accordingly, the one that has no DEC, is this line. But I saw the comments in below lines saying that the consumer should be responsible to close it. I'm confused here, should it be closed somewhere manually?
The |
According to the leak stacktraces, the leak is caused by whoever is responsible for the resulting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit but otherwise lgtm.
} | ||
} | ||
|
||
override def getNullSize: Double = OFFSET + VALIDITY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can use VALIDITY_N_OFFSET
here, also applies to the instance a few lines below.
build |
Oh, yes, I create a table from my PCA side and didn't close it. Thx for the analysis. |
|
||
private object BooleanConverter extends TypeConverter { | ||
override def append(row: Row, | ||
column: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
column: Int, | |
column: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: 2 more spaces for parameters would be better.
Signed-off-by: Allen Xu allxu@nvidia.com