Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-304] Upgrade to Arrow 4.0.0 #307

Merged
merged 5 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-3.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down Expand Up @@ -89,7 +89,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-3.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ You have to use a customized Arrow to support for our datasets Java API.

```
// build arrow-cpp
git clone -b arrow-3.0.0-oap-1.1 https://github.com/oap-project/arrow.git
git clone -b arrow-4.0.0-oap https://github.com/oap-project/arrow.git
cd arrow/cpp
mkdir build
cd build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.intel.oap.spark.sql.execution.datasources.v2.arrow;

import org.apache.arrow.memory.ReservationListener;
import org.apache.arrow.dataset.jni.ReservationListener;

/**
* Reserve Spark managed memory.
Expand Down
26 changes: 2 additions & 24 deletions arrow-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,30 +49,8 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<artifactId>hadoop-client</artifactId>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this will break v2 @rui-mo

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

@zhztheplayer zhztheplayer May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And is there other reason v2 is breaked?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the issue are mostly due to spark pre-build binary with hadoop 3.2 - which packages some specific version of depdencies other than those in hadoop-common-3.2.0. will do a quick test on my env to verify
https://github.com/apache/spark/blob/v3.1.1/pom.xml#L172
https://github.com/apache/spark/blob/v3.1.1/pom.xml#L121

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will check on this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhouyuan it is ok on my env

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/script/build_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}"
echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}"
mkdir -p $ARROW_SOURCE_DIR
mkdir -p $ARROW_INSTALL_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-3.0.0-oap $ARROW_SOURCE_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR
pushd $ARROW_SOURCE_DIR

cmake ./cpp \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
.asScala
.toList
val itrList = taskList
.map(task => task.scan())
.map(task => task.execute())

Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => {
itrList.foreach(_.close())
Expand All @@ -120,7 +120,7 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
val itr = itrList
.toIterator
.flatMap(itr => itr.asScala)
.map(vsr => ArrowUtils.loadVectors(vsr, file.partitionValues, partitionSchema,
.map(batch => ArrowUtils.loadBatch(batch, file.partitionValues, partitionSchema,
requiredSchema))
new UnsafeItr(itr).asInstanceOf[Iterator[InternalRow]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ case class ArrowPartitionReaderFactory(
.toList

val vsrItrList = taskList
.map(task => task.scan())
.map(task => task.execute())

val batchItr = vsrItrList
.toIterator
.flatMap(itr => itr.asScala)
.map(bundledVectors => ArrowUtils.loadVectors(bundledVectors, partitionedFile.partitionValues,
.map(batch => ArrowUtils.loadBatch(batch, partitionedFile.partitionValues,
readPartitionSchema, readDataSchema))

new PartitionReader[ColumnarBatch] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,28 @@
package com.intel.oap.spark.sql.execution.datasources.v2.arrow

import java.net.URI
import java.util.TimeZone

import scala.collection.JavaConverters._

import com.intel.oap.vectorized.ArrowWritableColumnVector
import org.apache.arrow.dataset.file.SingleFileDatasetFactory
import org.apache.arrow.dataset.scanner.ScanTask
import org.apache.arrow.vector.FieldVector
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkSchemaUtils}
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.vectorized.ColumnVector
import org.apache.spark.sql.vectorized.ColumnarBatch

object ArrowUtils {

def readSchema(file: FileStatus, options: CaseInsensitiveStringMap): Option[StructType] = {
val factory: SingleFileDatasetFactory =
val factory: FileSystemDatasetFactory =
makeArrowDiscovery(file.getPath.toString, -1L, -1L,
new ArrowOptions(options.asScala.toMap))
val schema = factory.inspect()
Expand All @@ -65,11 +64,11 @@ object ArrowUtils {
}

def makeArrowDiscovery(file: String, startOffset: Long, length: Long,
options: ArrowOptions): SingleFileDatasetFactory = {
options: ArrowOptions): FileSystemDatasetFactory = {

val format = getFormat(options).getOrElse(throw new IllegalStateException)
val allocator = SparkMemoryUtils.contextAllocator()
val factory = new SingleFileDatasetFactory(allocator,
val factory = new FileSystemDatasetFactory(allocator,
SparkMemoryUtils.contextMemoryPool(),
format,
rewriteUri(file),
Expand All @@ -80,72 +79,32 @@ object ArrowUtils {

def toArrowSchema(t: StructType): Schema = {
// fixme this might be platform dependent
SparkSchemaUtils.toArrowSchema(t, TimeZone.getDefault.getID)
SparkSchemaUtils.toArrowSchema(t, SQLConf.get.sessionLocalTimeZone)
}

def loadVectors(bundledVectors: ScanTask.ArrowBundledVectors, partitionValues: InternalRow,
def loadBatch(input: ArrowRecordBatch, partitionValues: InternalRow,
partitionSchema: StructType, dataSchema: StructType): ColumnarBatch = {
val rowCount: Int = getRowCount(bundledVectors)
val dataVectors = getDataVectors(bundledVectors, dataSchema)
val dictionaryVectors = getDictionaryVectors(bundledVectors, dataSchema)
val rowCount: Int = input.getLength

val vectors = ArrowWritableColumnVector.loadColumns(rowCount, dataVectors.asJava,
dictionaryVectors.asJava)
val vectors = try {
ArrowWritableColumnVector.loadColumns(rowCount, toArrowSchema(dataSchema), input)
} finally {
input.close()
}
val partitionColumns = ArrowWritableColumnVector.allocateColumns(rowCount, partitionSchema)
(0 until partitionColumns.length).foreach(i => {
ColumnVectorUtils.populate(partitionColumns(i), partitionValues, i)
partitionColumns(i).setValueCount(rowCount)
partitionColumns(i).setIsConstant()
})

val batch = new ColumnarBatch(vectors ++ partitionColumns, rowCount)
val batch = new ColumnarBatch(
vectors.map(_.asInstanceOf[ColumnVector]) ++
partitionColumns.map(_.asInstanceOf[ColumnVector]),
rowCount)
batch
}

private def getRowCount(bundledVectors: ScanTask.ArrowBundledVectors) = {
val valueVectors = bundledVectors.valueVectors
val rowCount = valueVectors.getRowCount
rowCount
}

private def getDataVectors(bundledVectors: ScanTask.ArrowBundledVectors,
dataSchema: StructType): List[FieldVector] = {
// TODO Deprecate following (bad performance maybe brought).
// TODO Assert vsr strictly matches dataSchema instead.
val valueVectors = bundledVectors.valueVectors
dataSchema.map(f => {
val vector = valueVectors.getVector(f.name)
if (vector == null) {
throw new IllegalStateException("Error: no vector named " + f.name + " in record bach")
}
vector
}).toList
}

private def getDictionaryVectors(bundledVectors: ScanTask.ArrowBundledVectors,
dataSchema: StructType): List[FieldVector] = {
val valueVectors = bundledVectors.valueVectors
val dictionaryVectorMap = bundledVectors.dictionaryVectors

val fieldNameToDictionaryEncoding = valueVectors.getSchema.getFields.asScala.map(f => {
f.getName -> f.getDictionary
}).toMap

val dictionaryVectorsWithNulls = dataSchema.map(f => {
val de = fieldNameToDictionaryEncoding(f.name)

Option(de) match {
case None => null
case _ =>
if (de.getIndexType.getTypeID != ArrowTypeID.Int) {
throw new IllegalArgumentException("Wrong index type: " + de.getIndexType)
}
dictionaryVectorMap.get(de.getId).getVector
}
}).toList
dictionaryVectorsWithNulls
}

private def getFormat(
options: ArrowOptions): Option[org.apache.arrow.dataset.file.FileFormat] = {
Option(options.originalFormat match {
Expand Down
2 changes: 1 addition & 1 deletion docs/ApacheArrowInstallation.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Please make sure your cmake version is qualified based on the prerequisite.
# Arrow
``` shell
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-3.0.0-oap-1.1
cd arrow && git checkout arrow-4.0.0-oap
mkdir -p arrow/cpp/release-build
cd arrow/cpp/release-build
cmake -DARROW_DEPENDENCY_SOURCE=BUNDLED -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_BOOST_USE_SHARED=ON -DARROW_JNI=ON -DARROW_DATASET=ON -DARROW_WITH_PROTOBUF=ON -DARROW_WITH_SNAPPY=ON -DARROW_WITH_LZ4=ON -DARROW_FILESYSTEM=ON -DARROW_JSON=ON ..
Expand Down
2 changes: 1 addition & 1 deletion docs/OAP-Developer-Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Then the dependencies below will be installed:
* [HPNL](https://github.com/Intel-bigdata/HPNL)
* [PMDK](https://github.com/pmem/pmdk)
* [OneAPI](https://software.intel.com/content/www/us/en/develop/tools/oneapi.html)
* [Arrow](https://github.com/oap-project/arrow/tree/arrow-3.0.0-oap-1.1)
* [Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap)
* [LLVM](https://llvm.org/)

Run the following command to learn more.
Expand Down
2 changes: 1 addition & 1 deletion docs/OAP-Installation-Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Once finished steps above, you have completed OAP dependencies installation and

Dependencies below are required by OAP and all of them are included in OAP Conda package, they will be automatically installed in your cluster when you Conda install OAP. Ensure you have activated environment which you created in the previous steps.

- [Arrow](https://github.com/oap-project/arrow/tree/arrow-3.0.0-oap-1.1)
- [Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap)
- [Plasma](http://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/)
- [Memkind](https://anaconda.org/intel/memkind)
- [Vmemcache](https://anaconda.org/intel/vmemcache)
Expand Down
17 changes: 9 additions & 8 deletions native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>3.0.0</version>
<version>${arrow.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -134,11 +134,6 @@
<groupId>com.intel.oap</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.flatbuffers</groupId>
<artifactId>flatbuffers-java</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -259,8 +254,14 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-arrow-datasource-standard</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
/** Helper class for JNI related operations. */
public class JniUtils {
private static final String LIBRARY_NAME = "spark_columnar_jni";
private static final String ARROW_LIBRARY_NAME = "libarrow.so.300";
private static final String GANDIVA_LIBRARY_NAME = "libgandiva.so.300";
private static final String ARROW_LIBRARY_NAME = "libarrow.so.400";
private static final String GANDIVA_LIBRARY_NAME = "libgandiva.so.400";
private static boolean isLoaded = false;
private static boolean isCodegenDependencyLoaded = false;
private static List<String> codegenJarsLoadedCache = new ArrayList<>();
Expand Down
20 changes: 9 additions & 11 deletions native-sql-engine/cpp/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ macro(build_arrow STATIC_ARROW)
ExternalProject_Add(arrow_ep
GIT_REPOSITORY https://github.com/oap-project/arrow.git
SOURCE_DIR ${ARROW_SOURCE_DIR}
GIT_TAG arrow-3.0.0-oap
GIT_TAG arrow-4.0.0-oap
BUILD_IN_SOURCE 1
INSTALL_DIR ${ARROW_PREFIX}
INSTALL_COMMAND make install
Expand Down Expand Up @@ -216,15 +216,15 @@ macro(build_arrow STATIC_ARROW)
)

ExternalProject_Add_Step(arrow_ep copy_arrow_binary_300
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${ARROW_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.300 ${root_directory}/releases/
COMMENT "Copy libarrow.so.300 to releases/"
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${ARROW_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.400 ${root_directory}/releases/
COMMENT "Copy libarrow.so.400 to releases/"
DEPENDEES mkdir download update patch configure build install java_install
WORKING_DIRECTORY "${ARROW_PREFIX}/"
)

ExternalProject_Add_Step(arrow_ep copy_arrow_binary_300_0_0
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${ARROW_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.300.0.0 ${root_directory}/releases/
COMMENT "Copy libarrow.so.300.0.0 to releases/"
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${ARROW_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.400.0.0 ${root_directory}/releases/
COMMENT "Copy libarrow.so.400.0.0 to releases/"
DEPENDEES mkdir download update patch configure build install java_install
WORKING_DIRECTORY "${ARROW_PREFIX}/"
)
Expand All @@ -239,15 +239,15 @@ macro(build_arrow STATIC_ARROW)
)

ExternalProject_Add_Step(arrow_ep copy_gandiva_binary_300
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${GANDIVA_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.300 ${root_directory}/releases/
COMMENT "Copy libgandiva.so.300 to releases/"
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${GANDIVA_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.400 ${root_directory}/releases/
COMMENT "Copy libgandiva.so.400 to releases/"
DEPENDEES mkdir download update patch configure build install java_install
WORKING_DIRECTORY "${ARROW_PREFIX}/"
)

ExternalProject_Add_Step(arrow_ep copy_gandiva_binary_300_0_0
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${GANDIVA_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.300.0.0 ${root_directory}/releases/
COMMENT "Copy libgandiva.so.300.0.0 to releases/"
COMMAND cp -a ${ARROW_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}${GANDIVA_LIB_NAME}${ARROW_SHARED_LIBRARY_SUFFIX}.400.0.0 ${root_directory}/releases/
COMMENT "Copy libgandiva.so.400.0.0 to releases/"
DEPENDEES mkdir download update patch configure build install java_install
WORKING_DIRECTORY "${ARROW_PREFIX}/"
)
Expand Down Expand Up @@ -321,13 +321,11 @@ macro(find_arrow)
message(STATUS "COPY and Set Arrow Header to: ${ARROW_BFS_INCLUDE_DIR}")
file(COPY ${ARROW_BFS_INCLUDE_DIR}/arrow DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_BFS_INCLUDE_DIR}/gandiva DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_BFS_INCLUDE_DIR}/jni DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_BFS_INCLUDE_DIR}/parquet DESTINATION ${root_directory}/releases/include)
else()
message(STATUS "COPY and Set Arrow Header to: ${ARROW_INCLUDE_DIR}")
file(COPY ${ARROW_INCLUDE_DIR}/arrow DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_INCLUDE_DIR}/gandiva DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_INCLUDE_DIR}/jni DESTINATION ${root_directory}/releases/include)
file(COPY ${ARROW_INCLUDE_DIR}/parquet DESTINATION ${root_directory}/releases/include)
endif()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ class EncodeArrayKernel : public KernalBase {
arrow::Status Evaluate(const std::shared_ptr<arrow::Array>& in,
std::shared_ptr<arrow::Array>* out) override;

private:
class Impl;

private:
std::unique_ptr<Impl> impl_;
arrow::compute::ExecContext* ctx_ = nullptr;
};
Expand Down
Loading