Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Close #169 and add support for DataSet of Avro records #217

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
39bc512
Change version to 3.1.1-SNAPSHOT
JoshRosen Nov 28, 2016
62c3c53
Remove Spark 1.x documentation from README; add links to older READMEs
JoshRosen Nov 28, 2016
29ef5f6
WIP - allow creation of Dataset from RDD[SpecificRecord]
Jan 26, 2017
fd777c6
Fix imports so that we can redeclare LambdaVariable, make unit tests …
Feb 2, 2017
e4c7a42
Update SBT for Spark-version-dependent code
Feb 3, 2017
f661f61
Keep spark version at 2.0.0
Feb 3, 2017
cdba653
Ensures all unit tests pass on Windows
Feb 6, 2017
f9ef636
Unit tests pass with different versions, but build.sbt is messy and i…
Feb 7, 2017
5c3c116
Add support for Spark 2.1.x (while retaining Spark 2.0.x support)
JoshRosen Feb 7, 2017
762f7a1
Update README in preparation for 3.2.0 release
JoshRosen Feb 7, 2017
e28e456
Setting version to 3.2.0
liancheng Feb 8, 2017
1bfe421
Setting version to 3.2.1-SNAPSHOT
liancheng Feb 8, 2017
51eb883
3.1.0 -> 3.2.0 in README
JoshRosen Feb 8, 2017
69a0570
Build is now working and including the classfiles in the final artifact
Feb 9, 2017
ce47906
build.sbt is looking better!
Feb 10, 2017
0934452
Much tighter way to include the spark version-specific modules into t…
Feb 10, 2017
3ee8ccd
Merge upstream (3.2.1)
Feb 10, 2017
73fc4d8
Update create table to avoid deprecration warnings
Feb 10, 2017
0e99d72
Make unit test for GenericData.Record pass
Feb 15, 2017
3a1fd20
Make tests pass with ENUM and FIXED
Feb 15, 2017
d5c0329
add support for DateType
nihed Feb 16, 2017
5de3574
Merge master
Apr 18, 2017
909edcd
Revert "Merge master"
Apr 18, 2017
d9df990
Fix bad merge and merge branch-3.2 properly
Apr 18, 2017
e15c5bd
Merge master
Oct 27, 2017
8300332
Merge master
Oct 30, 2017
7869c11
Flakey tests?
Oct 30, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,39 @@ before_cache:
- find $HOME/.sbt -name "*.lock" -delete
matrix:
include:
# ---- Spark 2.0.x ----------------------------------------------------------------------------
# Spark 2.0.0, Scala 2.11, and Avro 1.7.x
- jdk: openjdk7
scala: 2.11.7
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.0.0" TEST_AVRO_VERSION="1.7.6" TEST_AVRO_MAPRED_VERSION="1.7.7"
# Spark 2.0.0, Scala 2.11, and Avro 1.8.x
- jdk: openjdk7
scala: 2.11.7
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.0.0" TEST_AVRO_VERSION="1.8.0" TEST_AVRO_MAPRED_VERSION="1.8.0"
# Spark 2.0.0, Scala 2.10, and Avro 1.7.x
- jdk: openjdk7
scala: 2.10.4
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.0.0" TEST_AVRO_VERSION="1.7.6" TEST_AVRO_MAPRED_VERSION="1.7.7"
# Spark 2.0.0, Scala 2.10, and Avro 1.8.x
- jdk: openjdk7
scala: 2.10.4
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.0.0" TEST_AVRO_VERSION="1.8.0" TEST_AVRO_MAPRED_VERSION="1.8.0"
# ---- Spark 2.1.x ----------------------------------------------------------------------------
# Spark 2.1.0, Scala 2.11, and Avro 1.7.x
- jdk: openjdk7
scala: 2.11.8
scala: 2.11.7
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.1.0" TEST_AVRO_VERSION="1.7.6" TEST_AVRO_MAPRED_VERSION="1.7.7"
# Spark 2.1.0, Scala 2.11, and Avro 1.8.x
- jdk: openjdk7
scala: 2.11.7
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.1.0" TEST_AVRO_VERSION="1.8.0" TEST_AVRO_MAPRED_VERSION="1.8.0"
# Spark 2.1.0, Scala 2.10, and Avro 1.7.x
- jdk: openjdk7
scala: 2.10.6
scala: 2.10.4
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.1.0" TEST_AVRO_VERSION="1.7.6" TEST_AVRO_MAPRED_VERSION="1.7.7"
# Spark 2.1.0, Scala 2.10, and Avro 1.8.x
- jdk: openjdk7
scala: 2.10.6
scala: 2.10.4
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.1.0" TEST_AVRO_VERSION="1.8.0" TEST_AVRO_MAPRED_VERSION="1.8.0"
# Spark 2.2.0, Scala 2.11, and Avro 1.7.x
- jdk: openjdk8
Expand Down
38 changes: 32 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
name := "spark-avro"

organization := "com.databricks"
lazy val commonSettings = Seq(
organization := "com.databricks",
scalaVersion := "2.11.7",
crossScalaVersions := Seq("2.10.5", "2.11.7")
)

scalaVersion := "2.11.8"
commonSettings

crossScalaVersions := Seq("2.10.6", "2.11.8")
name := "spark-avro"

spName := "databricks/spark-avro"

sparkVersion := "2.1.0"
sparkVersion := "2.0.0"

val testSparkVersion = settingKey[String]("The version of Spark to test against.")

Expand Down Expand Up @@ -107,7 +110,7 @@ pomExtra :=

bintrayReleaseOnPublish in ThisBuild := false

import ReleaseTransformations._
import sbtrelease.ReleasePlugin.autoImport.ReleaseTransformations._

// Add publishing to spark packages as another step.
releaseProcess := Seq[ReleaseStep](
Expand All @@ -123,3 +126,26 @@ releaseProcess := Seq[ReleaseStep](
pushChanges,
releaseStepTask(spPublish)
)


lazy val spark21xProj = project.in(file("spark-2.1.x")).settings(
commonSettings,
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0" % "provided"
).disablePlugins(SparkPackagePlugin)


lazy val spark20xProj = project.in(file("spark-2.0.x")).settings(
commonSettings,
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0" % "provided"
).disablePlugins(SparkPackagePlugin)


unmanagedClasspath in Test ++= {
(exportedProducts in (spark20xProj, Runtime)).value ++
(exportedProducts in (spark21xProj, Runtime)).value
}

products in (Compile, packageBin) ++= Seq(
(classDirectory in (spark20xProj, Compile)).value,
(classDirectory in (spark21xProj, Compile)).value
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2014 Databricks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.databricks.spark.avro

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.StructType

private[avro] class Spark20AvroOutputWriterFactory(
schema: StructType,
recordName: String,
recordNamespace: String) extends OutputWriterFactory {

def doGetDefaultWorkFile(path: String, context: TaskAttemptContext, extension: String): Path = {
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val taskAttemptId: TaskAttemptID = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}

def newInstance(
path: String,
bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {

val ot = Class.forName("com.databricks.spark.avro.AvroOutputWriter")
val meth = ot.getDeclaredConstructor(
classOf[String], classOf[TaskAttemptContext], classOf[StructType],
classOf[String], classOf[String],
classOf[Function3[String, TaskAttemptContext, String, Path]]
)
meth.setAccessible(true)
meth.newInstance(path, context, schema, recordName, recordNamespace, doGetDefaultWorkFile _)
.asInstanceOf[OutputWriter]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,35 @@

package com.databricks.spark.avro

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.StructType

private[avro] class AvroOutputWriterFactory(
private[avro] class Spark21AvroOutputWriterFactory(
schema: StructType,
recordName: String,
recordNamespace: String) extends OutputWriterFactory {

override def getFileExtension(context: TaskAttemptContext): String = {
".avro"
def doGetDefaultWorkFile(path: String, context: TaskAttemptContext, extension: String): Path = {
new Path(path)
}

override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new AvroOutputWriter(path, context, schema, recordName, recordNamespace)
def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {

val ot = Class.forName("com.databricks.spark.avro.AvroOutputWriter")
val meth = ot.getDeclaredConstructor(
classOf[String], classOf[TaskAttemptContext], classOf[StructType],
classOf[String], classOf[String],
classOf[Function3[String, TaskAttemptContext, String, Path]]
)
meth.setAccessible(true)
meth.newInstance(path, context, schema, recordName, recordNamespace, doGetDefaultWorkFile _)
.asInstanceOf[OutputWriter]
}

override def getFileExtension(context: TaskAttemptContext): String = ".avro"
}
Loading