Skip to content

Commit

Permalink
[SPARK-44396][CONNECT] Direct Arrow Deserialization
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR adds direct arrow to user object deserialization to the Spark Connect Scala Client.

### Why are the changes needed?
We want to decouple the scala client from catalyst. We need a way to encode user object from and to arrrow.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added tests to `ArrowEncoderSuite`.

Closes #42011 from hvanhovell/SPARK-44396.

Authored-by: Herman van Hovell <herman@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit 5939b75)
Signed-off-by: Herman van Hovell <herman@databricks.com>
  • Loading branch information
hvanhovell committed Jul 19, 2023
1 parent 3107c1e commit 244d02b
Show file tree
Hide file tree
Showing 11 changed files with 1,085 additions and 178 deletions.
19 changes: 19 additions & 0 deletions connector/connect/client/jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<!-- Shade all Guava / Protobuf / Netty dependencies of this build -->
Expand Down Expand Up @@ -224,6 +225,24 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala-${scala.binary.version}</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connect.client.arrow

import scala.collection.generic.{GenericCompanion, GenMapFactory}
import scala.collection.mutable
import scala.reflect.ClassTag

import org.apache.spark.sql.connect.client.arrow.ArrowDeserializers.resolveCompanion

/**
* A couple of scala version specific collection utility functions.
*/
private[arrow] object ScalaCollectionUtils {
def getIterableCompanion(tag: ClassTag[_]): GenericCompanion[Iterable] = {
ArrowDeserializers.resolveCompanion[GenericCompanion[Iterable]](tag)
}
def getMapCompanion(tag: ClassTag[_]): GenMapFactory[Map] = {
resolveCompanion[GenMapFactory[Map]](tag)
}
def wrap[T](array: AnyRef): mutable.WrappedArray[T] = {
mutable.WrappedArray.make(array)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connect.client.arrow

import scala.collection.{mutable, IterableFactory, MapFactory}
import scala.reflect.ClassTag

import org.apache.spark.sql.connect.client.arrow.ArrowDeserializers.resolveCompanion

/**
* A couple of scala version specific collection utility functions.
*/
private[arrow] object ScalaCollectionUtils {
def getIterableCompanion(tag: ClassTag[_]): IterableFactory[Iterable] = {
ArrowDeserializers.resolveCompanion[IterableFactory[Iterable]](tag)
}
def getMapCompanion(tag: ClassTag[_]): MapFactory[Map] = {
resolveCompanion[MapFactory[Map]](tag)
}
def wrap[T](array: AnyRef): mutable.WrappedArray[T] = {
mutable.WrappedArray.make(array.asInstanceOf[Array[T]])
}
}
Loading

0 comments on commit 244d02b

Please sign in to comment.