Skip to content

Commit

Permalink
[NSE-261] ArrowDataSource: Add S3 Support (oap-project#270)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Apr 26, 2021
1 parent d3f7ab7 commit ef97b98
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 8 deletions.
58 changes: 57 additions & 1 deletion arrow-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,50 @@
</pluginRepositories>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.3</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpcore</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand All @@ -61,7 +105,7 @@
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-format</artifactId>
<artifactId>arrow-vector</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
Expand All @@ -83,13 +127,25 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
</exclusions>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
</exclusions>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ object ArrowUtils {

private def rewriteUri(uriStr: String): String = {
val uri = URI.create(uriStr)
if (uri.getScheme == "s3" || uri.getScheme == "s3a") {
val s3Rewritten = new URI("s3", uri.getAuthority,
uri.getPath, uri.getQuery, uri.getFragment).toString
return s3Rewritten
}
val sch = uri.getScheme match {
case "hdfs" => "hdfs"
case "file" => "file"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,18 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
verifyParquet(
spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet")
.option(ArrowOptions.KEY_FILESYSTEM, "hdfs")
.arrow(path))
}

test("simple sql query on s3") {
val path = "s3a://mlp-spark-dataset-bucket/test_arrowds_s3_small"
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet")
.arrow(path)
frame.createOrReplaceTempView("stab")
assert(spark.sql("select id from stab").count() === 1000)
}

test("create catalog table") {
val path = ArrowDataSourceTest.locateResourcePath(parquetFile1)
spark.catalog.createTable("ptab", path, "arrow")
Expand All @@ -130,7 +138,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
val path = ArrowDataSourceTest.locateResourcePath(parquetFile1)
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet")
.option(ArrowOptions.KEY_FILESYSTEM, "hdfs")
.arrow(path)
frame.createOrReplaceTempView("ptab")
verifyParquet(spark.sql("select * from ptab"))
Expand All @@ -142,7 +149,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
val path = ArrowDataSourceTest.locateResourcePath(parquetFile3)
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet")
.option(ArrowOptions.KEY_FILESYSTEM, "hdfs")
.arrow(path)
frame.createOrReplaceTempView("ptab")
val sqlFrame = spark.sql("select * from ptab")
Expand All @@ -163,7 +169,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
val path = ArrowDataSourceTest.locateResourcePath(parquetFile1)
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet")
.option(ArrowOptions.KEY_FILESYSTEM, "hdfs")
.arrow(path)
frame.createOrReplaceTempView("ptab")
spark.sql("select col from ptab where col = 1").explain(true)
Expand All @@ -178,7 +183,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
val path = ArrowDataSourceTest.locateResourcePath(parquetFile2)
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet")
.option(ArrowOptions.KEY_FILESYSTEM, "hdfs")
.arrow(path)
frame.createOrReplaceTempView("ptab")
val rows = spark.sql("select * from ptab where col = 'b'").collect()
Expand Down Expand Up @@ -215,7 +219,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
val path = ArrowDataSourceTest.locateResourcePath(parquetFile1)
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet")
.option(ArrowOptions.KEY_FILESYSTEM, "hdfs")
.arrow(path)
frame.createOrReplaceTempView("ptab")

Expand Down
2 changes: 1 addition & 1 deletion native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<arrow.version>3.0.0</arrow.version>
<spark.version>3.0.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>
<scala.version>2.12.10</scala.version>
<build.testJarPhase>none</build.testJarPhase>
<build.copyDependenciesPhase>package</build.copyDependenciesPhase>
<hadoop.deps.scope>provided</hadoop.deps.scope>
Expand Down

0 comments on commit ef97b98

Please sign in to comment.