From ef97b986088e93b56b350e788f0f9d1e78f65c82 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 22 Apr 2021 13:41:48 +0800 Subject: [PATCH] [NSE-261] ArrowDataSource: Add S3 Support (#270) Closes #261 --- arrow-data-source/pom.xml | 58 ++++++++++++++++++- .../datasources/v2/arrow/ArrowUtils.scala | 5 ++ .../arrow/ArrowDataSourceTest.scala | 15 +++-- native-sql-engine/core/pom.xml | 2 +- 4 files changed, 72 insertions(+), 8 deletions(-) diff --git a/arrow-data-source/pom.xml b/arrow-data-source/pom.xml index ad55360e0..e2b2355c0 100644 --- a/arrow-data-source/pom.xml +++ b/arrow-data-source/pom.xml @@ -48,6 +48,50 @@ + + org.apache.hadoop + hadoop-aws + 2.7.3 + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + + + javax.servlet + servlet-api + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-json + + + com.sun.jersey + jersey-server + + + commons-httpclient + commons-httpcore + + + + + org.apache.httpcomponents + httpcore + 4.2 + org.scala-lang scala-library @@ -61,7 +105,7 @@ org.apache.arrow - arrow-format + arrow-vector provided @@ -83,6 +127,12 @@ org.apache.spark spark-catalyst_2.12 ${spark.version} + + + org.apache.arrow + arrow-vector + + test-jar test @@ -90,6 +140,12 @@ org.apache.spark spark-sql_2.12 ${spark.version} + + + org.apache.arrow + arrow-vector + + test-jar test diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala index 4ef604114..4af788d0e 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala @@ -156,6 +156,11 @@ object ArrowUtils { private def rewriteUri(uriStr: String): String = { val uri = URI.create(uriStr) + if (uri.getScheme == "s3" || uri.getScheme == "s3a") { + val s3Rewritten = new URI("s3", uri.getAuthority, + uri.getPath, uri.getQuery, uri.getFragment).toString + return s3Rewritten + } val sch = uri.getScheme match { case "hdfs" => "hdfs" case "file" => "file" diff --git a/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala b/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala index f88e085fa..161d285c7 100644 --- a/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala +++ b/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala @@ -106,10 +106,18 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { verifyParquet( spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path)) } + test("simple sql query on s3") { + val path = "s3a://mlp-spark-dataset-bucket/test_arrowds_s3_small" + val frame = spark.read + .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") + .arrow(path) + frame.createOrReplaceTempView("stab") + assert(spark.sql("select id from stab").count() === 1000) + } + test("create catalog table") { val path = ArrowDataSourceTest.locateResourcePath(parquetFile1) spark.catalog.createTable("ptab", path, "arrow") @@ -130,7 +138,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { val path = ArrowDataSourceTest.locateResourcePath(parquetFile1) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path) frame.createOrReplaceTempView("ptab") verifyParquet(spark.sql("select * from ptab")) @@ -142,7 +149,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { val path = ArrowDataSourceTest.locateResourcePath(parquetFile3) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path) frame.createOrReplaceTempView("ptab") val sqlFrame = spark.sql("select * from ptab") @@ -163,7 +169,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { val path = ArrowDataSourceTest.locateResourcePath(parquetFile1) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path) frame.createOrReplaceTempView("ptab") spark.sql("select col from ptab where col = 1").explain(true) @@ -178,7 +183,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { val path = ArrowDataSourceTest.locateResourcePath(parquetFile2) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path) frame.createOrReplaceTempView("ptab") val rows = spark.sql("select * from ptab where col = 'b'").collect() @@ -215,7 +219,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { val path = ArrowDataSourceTest.locateResourcePath(parquetFile1) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path) frame.createOrReplaceTempView("ptab") diff --git a/native-sql-engine/core/pom.xml b/native-sql-engine/core/pom.xml index 5064374d3..80714f383 100644 --- a/native-sql-engine/core/pom.xml +++ b/native-sql-engine/core/pom.xml @@ -33,7 +33,7 @@ 3.0.0 3.0.0 2.12 - 2.12.8 + 2.12.10 none package provided