From dae6c6f38ec044e6540b07876d1924f2f6edbd5b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 16 Jan 2017 15:47:42 -0800 Subject: [PATCH] [SC-5606] Inline spark-avro sources into databricks/spark This patch ports `spark-avro` as of https://github.com/databricks/spark-avro/commit/b01a034b0e5d785609275aa97d9c5e3719613194 and updates it to run with Spark 2.10 by including the fixes from https://github.com/databricks/spark-avro/pull/206. Via a transitive dependency on `spark-core` and `spark-sql`, this winds up not needing to add new dependencies on Avro to the poms. I've updated the license headers to use the header that we use for Spark-edge features. Author: Josh Rosen Closes #170 from JoshRosen/add-spark-avro. --- dev/run-tests.py | 4 +- dev/sparktestsupport/modules.py | 10 + external/avro/pom.xml | 74 ++ .../spark/avro/AvroOutputWriter.scala | 140 ++++ .../spark/avro/AvroOutputWriterFactory.scala | 30 + .../databricks/spark/avro/DefaultSource.scala | 283 ++++++++ .../spark/avro/SchemaConverters.scala | 396 ++++++++++ .../com/databricks/spark/avro/package.scala | 30 + .../src/test/resources/benchmarkSchema.avsc | 35 + .../avro/src/test/resources/episodes.avro | Bin 0 -> 597 bytes .../avro/src/test/resources/log4j.properties | 49 ++ .../test-random-partitioned/part-r-00000.avro | Bin 0 -> 1768 bytes .../test-random-partitioned/part-r-00001.avro | Bin 0 -> 2313 bytes .../test-random-partitioned/part-r-00002.avro | Bin 0 -> 1621 bytes .../test-random-partitioned/part-r-00003.avro | Bin 0 -> 2117 bytes .../test-random-partitioned/part-r-00004.avro | Bin 0 -> 3282 bytes .../test-random-partitioned/part-r-00005.avro | Bin 0 -> 1550 bytes .../test-random-partitioned/part-r-00006.avro | Bin 0 -> 1729 bytes .../test-random-partitioned/part-r-00007.avro | Bin 0 -> 1897 bytes .../test-random-partitioned/part-r-00008.avro | Bin 0 -> 3420 bytes .../test-random-partitioned/part-r-00009.avro | Bin 0 -> 1796 bytes .../test-random-partitioned/part-r-00010.avro | Bin 0 -> 3872 bytes external/avro/src/test/resources/test.avro | Bin 0 -> 1365 bytes external/avro/src/test/resources/test.avsc | 53 ++ external/avro/src/test/resources/test.json | 42 ++ .../spark/avro/AvroFileGenerator.scala | 86 +++ .../spark/avro/AvroReadBenchmark.scala | 53 ++ .../com/databricks/spark/avro/AvroSuite.scala | 675 ++++++++++++++++++ .../spark/avro/AvroWriteBenchmark.scala | 82 +++ .../avro/SerializableConfigurationSuite.scala | 42 ++ .../com/databricks/spark/avro/TestUtils.scala | 148 ++++ pom.xml | 1 + project/SparkBuild.scala | 10 +- 33 files changed, 2236 insertions(+), 7 deletions(-) create mode 100644 external/avro/pom.xml create mode 100755 external/avro/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala create mode 100755 external/avro/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala create mode 100755 external/avro/src/main/scala/com/databricks/spark/avro/DefaultSource.scala create mode 100755 external/avro/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala create mode 100755 external/avro/src/main/scala/com/databricks/spark/avro/package.scala create mode 100755 external/avro/src/test/resources/benchmarkSchema.avsc create mode 100755 external/avro/src/test/resources/episodes.avro create mode 100755 external/avro/src/test/resources/log4j.properties create mode 100755 external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro create mode 100755 external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro create mode 100755 external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro create mode 100755 external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro create mode 100755 external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro create mode 100755 external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro create mode 100755 external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro create mode 100755 external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro create mode 100755 external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro create mode 100755 external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro create mode 100755 external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro create mode 100755 external/avro/src/test/resources/test.avro create mode 100755 external/avro/src/test/resources/test.avsc create mode 100755 external/avro/src/test/resources/test.json create mode 100755 external/avro/src/test/scala/com/databricks/spark/avro/AvroFileGenerator.scala create mode 100755 external/avro/src/test/scala/com/databricks/spark/avro/AvroReadBenchmark.scala create mode 100755 external/avro/src/test/scala/com/databricks/spark/avro/AvroSuite.scala create mode 100755 external/avro/src/test/scala/com/databricks/spark/avro/AvroWriteBenchmark.scala create mode 100755 external/avro/src/test/scala/com/databricks/spark/avro/SerializableConfigurationSuite.scala create mode 100755 external/avro/src/test/scala/com/databricks/spark/avro/TestUtils.scala diff --git a/dev/run-tests.py b/dev/run-tests.py index a647b98a4a2f1..a9692ab0c1350 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -110,8 +110,8 @@ def determine_modules_to_test(changed_modules): ['graphx', 'examples'] >>> x = [x.name for x in determine_modules_to_test([modules.sql])] >>> x # doctest: +NORMALIZE_WHITESPACE - ['sql', 'hive', 'mllib', 'sql-kafka-0-10', 'sql-kafka-0-8', 'examples', 'hive-thriftserver', - 'pyspark-sql', 'sparkr', 'pyspark-mllib', 'pyspark-ml'] + ['sql', 'avro', 'hive', 'mllib', 'sql-kafka-0-10', 'sql-kafka-0-8', 'examples', + 'hive-thriftserver', 'pyspark-sql', 'sparkr', 'pyspark-mllib', 'pyspark-ml'] """ modules_to_test = set() for module in changed_modules: diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 5dc9a247a6e7e..b7fc30854f13b 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -157,6 +157,16 @@ def __hash__(self): ] ) +avro = Module( + name="avro", + dependencies=[sql], + source_file_regexes=[ + "external/avro", + ], + sbt_test_goals=[ + "avro/test", + ] +) sql_kafka = Module( name="sql-kafka-0-10", diff --git a/external/avro/pom.xml b/external/avro/pom.xml new file mode 100644 index 0000000000000..dfd72207a8f16 --- /dev/null +++ b/external/avro/pom.xml @@ -0,0 +1,74 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent_2.11 + 2.1.0 + ../../pom.xml + + + com.databricks + spark-avro_2.11 + + avro + + jar + Spark Avro + http://spark.apache.org/ + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark + spark-tags_${scala.binary.version} + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + diff --git a/external/avro/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala new file mode 100755 index 0000000000000..553efaa6efca6 --- /dev/null +++ b/external/avro/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala @@ -0,0 +1,140 @@ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.databricks.spark.avro + +import java.io.{IOException, OutputStream} +import java.nio.ByteBuffer +import java.sql.Timestamp +import java.util.HashMap + +import scala.collection.immutable.Map + +import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.avro.generic.GenericData.Record +import org.apache.avro.generic.GenericRecord +import org.apache.avro.mapred.AvroKey +import org.apache.avro.mapreduce.AvroKeyOutputFormat +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.datasources.OutputWriter +import org.apache.spark.sql.types._ + +// NOTE: This class is instantiated and used on executor side only, no need to be serializable. +private[avro] class AvroOutputWriter( + path: String, + context: TaskAttemptContext, + schema: StructType, + recordName: String, + recordNamespace: String) extends OutputWriter { + + private lazy val converter = createConverterToAvro(schema, recordName, recordNamespace) + + /** + * Overrides the couple of methods responsible for generating the output streams / files so + * that the data can be correctly partitioned + */ + private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = + new AvroKeyOutputFormat[GenericRecord]() { + + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + new Path(path) + } + + @throws(classOf[IOException]) + override def getAvroFileOutputStream(c: TaskAttemptContext): OutputStream = { + val path = getDefaultWorkFile(context, ".avro") + path.getFileSystem(context.getConfiguration).create(path) + } + + }.getRecordWriter(context) + + override def write(row: Row): Unit = { + val key = new AvroKey(converter(row).asInstanceOf[GenericRecord]) + recordWriter.write(key, NullWritable.get()) + } + + override def close(): Unit = recordWriter.close(context) + + /** + * This function constructs converter function for a given sparkSQL datatype. This is used in + * writing Avro records out to disk + */ + private def createConverterToAvro( + dataType: DataType, + structName: String, + recordNamespace: String): (Any) => Any = { + dataType match { + case BinaryType => (item: Any) => item match { + case null => null + case bytes: Array[Byte] => ByteBuffer.wrap(bytes) + } + case ByteType | ShortType | IntegerType | LongType | + FloatType | DoubleType | StringType | BooleanType => identity + case _: DecimalType => (item: Any) => if (item == null) null else item.toString + case TimestampType => (item: Any) => + if (item == null) null else item.asInstanceOf[Timestamp].getTime + case ArrayType(elementType, _) => + val elementConverter = createConverterToAvro(elementType, structName, recordNamespace) + (item: Any) => { + if (item == null) { + null + } else { + val sourceArray = item.asInstanceOf[Seq[Any]] + val sourceArraySize = sourceArray.size + val targetArray = new Array[Any](sourceArraySize) + var idx = 0 + while (idx < sourceArraySize) { + targetArray(idx) = elementConverter(sourceArray(idx)) + idx += 1 + } + targetArray + } + } + case MapType(StringType, valueType, _) => + val valueConverter = createConverterToAvro(valueType, structName, recordNamespace) + (item: Any) => { + if (item == null) { + null + } else { + val javaMap = new HashMap[String, Any]() + item.asInstanceOf[Map[String, Any]].foreach { case (key, value) => + javaMap.put(key, valueConverter(value)) + } + javaMap + } + } + case structType: StructType => + val builder = SchemaBuilder.record(structName).namespace(recordNamespace) + val schema: Schema = SchemaConverters.convertStructToAvro( + structType, builder, recordNamespace) + val fieldConverters = structType.fields.map(field => + createConverterToAvro(field.dataType, field.name, recordNamespace)) + (item: Any) => { + if (item == null) { + null + } else { + val record = new Record(schema) + val convertersIterator = fieldConverters.iterator + val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator + val rowIterator = item.asInstanceOf[Row].toSeq.iterator + + while (convertersIterator.hasNext) { + val converter = convertersIterator.next() + record.put(fieldNamesIterator.next(), converter(rowIterator.next())) + } + record + } + } + } + } +} diff --git a/external/avro/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala b/external/avro/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala new file mode 100755 index 0000000000000..0577ffae8fe7f --- /dev/null +++ b/external/avro/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.databricks.spark.avro + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.types.StructType + +private[avro] class AvroOutputWriterFactory( + schema: StructType, + recordName: String, + recordNamespace: String) extends OutputWriterFactory { + + override def getFileExtension(context: TaskAttemptContext): String = ".avro" + + def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new AvroOutputWriter(path, context, schema, recordName, recordNamespace) + } +} diff --git a/external/avro/src/main/scala/com/databricks/spark/avro/DefaultSource.scala b/external/avro/src/main/scala/com/databricks/spark/avro/DefaultSource.scala new file mode 100755 index 0000000000000..2d65408fcd433 --- /dev/null +++ b/external/avro/src/main/scala/com/databricks/spark/avro/DefaultSource.scala @@ -0,0 +1,283 @@ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.databricks.spark.avro + +import java.io._ +import java.net.URI +import java.util.zip.Deflater + +import scala.util.control.NonFatal + +import com.databricks.spark.avro.DefaultSource.{AvroSchema, IgnoreFilesWithoutExtensionProperty, SerializableConfiguration} +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} +import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.avro.file.{DataFileConstants, DataFileReader} +import org.apache.avro.generic.{GenericDatumReader, GenericRecord} +import org.apache.avro.mapred.{AvroOutputFormat, FsInput} +import org.apache.avro.mapreduce.AvroJob +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.Job +import org.slf4j.LoggerFactory + +import org.apache.spark.TaskContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile} +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types.StructType + +private[avro] class DefaultSource extends FileFormat with DataSourceRegister { + private val log = LoggerFactory.getLogger(getClass) + + // TODO(josh): Investigate whether we actually need this equals(), and, if so, add + // tests for data source caching resolution in order to make sure this path is exercised + // and does not break. See https://github.com/databricks/spark/pull/170#discussion_r95915636 + override def equals(other: Any): Boolean = other match { + case _: DefaultSource => true + case _ => false + } + + // Dummy hashCode() to appease ScalaStyle. + // See https://github.com/databricks/spark/pull/170#discussion_r95915636 + override def hashCode(): Int = super.hashCode() + + override def inferSchema( + spark: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + val conf = spark.sparkContext.hadoopConfiguration + + // Schema evolution is not supported yet. Here we only pick a single random sample file to + // figure out the schema of the whole dataset. + val sampleFile = if (conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true)) { + files.find(_.getPath.getName.endsWith(".avro")).getOrElse { + throw new FileNotFoundException( + "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" is " + + "set to true. Do all input files have \".avro\" extension?" + ) + } + } else { + files.headOption.getOrElse { + throw new FileNotFoundException("No Avro files found.") + } + } + + // User can specify an optional avro json schema. + val avroSchema = options.get(AvroSchema).map(new Schema.Parser().parse).getOrElse { + val in = new FsInput(sampleFile.getPath, conf) + try { + val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()) + try { + reader.getSchema + } finally { + reader.close() + } + } finally { + in.close() + } + } + + SchemaConverters.toSqlType(avroSchema).dataType match { + case t: StructType => Some(t) + case _ => throw new RuntimeException( + s"""Avro schema cannot be converted to a Spark SQL StructType: + | + |${avroSchema.toString(true)} + |""".stripMargin) + } + } + + override def shortName(): String = "avro" + + override def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = true + + override def prepareWrite( + spark: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val recordName = options.getOrElse("recordName", "topLevelRecord") + val recordNamespace = options.getOrElse("recordNamespace", "") + val build = SchemaBuilder.record(recordName).namespace(recordNamespace) + val outputAvroSchema = SchemaConverters.convertStructToAvro(dataSchema, build, recordNamespace) + + AvroJob.setOutputKeySchema(job, outputAvroSchema) + val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec" + val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level" + val COMPRESS_KEY = "mapred.output.compress" + + spark.conf.get(AVRO_COMPRESSION_CODEC, "snappy") match { + case "uncompressed" => + log.info("writing uncompressed Avro records") + job.getConfiguration.setBoolean(COMPRESS_KEY, false) + + case "snappy" => + log.info("compressing Avro output using Snappy") + job.getConfiguration.setBoolean(COMPRESS_KEY, true) + job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC) + + case "deflate" => + val deflateLevel = spark.conf.get( + AVRO_DEFLATE_LEVEL, Deflater.DEFAULT_COMPRESSION.toString).toInt + log.info(s"compressing Avro output using deflate (level=$deflateLevel)") + job.getConfiguration.setBoolean(COMPRESS_KEY, true) + job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC) + job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel) + + case unknown: String => + log.error(s"unsupported compression codec $unknown") + } + + new AvroOutputWriterFactory(dataSchema, recordName, recordNamespace) + } + + override def buildReader( + spark: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + + val broadcastedConf = + spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + (file: PartitionedFile) => { + val log = LoggerFactory.getLogger(classOf[DefaultSource]) + val conf = broadcastedConf.value.value + val userProvidedSchema = options.get(AvroSchema).map(new Schema.Parser().parse) + + // TODO Removes this check once `FileFormat` gets a general file filtering interface method. + // Doing input file filtering is improper because we may generate empty tasks that process no + // input files but stress the scheduler. We should probably add a more general input file + // filtering mechanism for `FileFormat` data sources. See SPARK-16317. + if ( + conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true) && + !file.filePath.endsWith(".avro") + ) { + Iterator.empty + } else { + val reader = { + val in = new FsInput(new Path(new URI(file.filePath)), conf) + try { + val datumReader = userProvidedSchema match { + case Some(userSchema) => new GenericDatumReader[GenericRecord](userSchema) + case _ => new GenericDatumReader[GenericRecord]() + } + DataFileReader.openReader(in, datumReader) + } catch { + case NonFatal(e) => + log.error("Exception while opening DataFileReader", e) + in.close() + throw e + } + } + + // Ensure that the reader is closed even if the task fails or doesn't consume the entire + // iterator of records. + Option(TaskContext.get()).foreach { taskContext => + taskContext.addTaskCompletionListener { _ => + reader.close() + } + } + + reader.sync(file.start) + val stop = file.start + file.length + + val rowConverter = SchemaConverters.createConverterToSQL( + userProvidedSchema.getOrElse(reader.getSchema), requiredSchema) + + new Iterator[InternalRow] { + // Used to convert `Row`s containing data columns into `InternalRow`s. + private val encoderForDataColumns = RowEncoder(requiredSchema) + + private[this] var completed = false + + override def hasNext: Boolean = { + if (completed) { + false + } else { + val r = reader.hasNext && !reader.pastSync(stop) + if (!r) { + reader.close() + completed = true + } + r + } + } + + override def next(): InternalRow = { + if (reader.pastSync(stop)) { + throw new NoSuchElementException("next on empty iterator") + } + val record = reader.next() + val safeDataRow = rowConverter(record).asInstanceOf[GenericRow] + + // The safeDataRow is reused, we must do a copy + encoderForDataColumns.toRow(safeDataRow) + } + } + } + } + } +} + +private[avro] object DefaultSource { + val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension" + + val AvroSchema = "avroSchema" + + class SerializableConfiguration(@transient var value: Configuration) + extends Serializable with KryoSerializable { + @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass) + + private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException { + out.defaultWriteObject() + value.write(out) + } + + private def readObject(in: ObjectInputStream): Unit = tryOrIOException { + value = new Configuration(false) + value.readFields(in) + } + + private def tryOrIOException[T](block: => T): T = { + try { + block + } catch { + case e: IOException => + log.error("Exception encountered", e) + throw e + case NonFatal(e) => + log.error("Exception encountered", e) + throw new IOException(e) + } + } + + def write(kryo: Kryo, out: Output): Unit = { + val dos = new DataOutputStream(out) + value.write(dos) + dos.flush() + } + + def read(kryo: Kryo, in: Input): Unit = { + value = new Configuration(false) + value.readFields(new DataInputStream(in)) + } + } +} diff --git a/external/avro/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/external/avro/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala new file mode 100755 index 0000000000000..742516124d8d1 --- /dev/null +++ b/external/avro/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -0,0 +1,396 @@ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.databricks.spark.avro + +import java.nio.ByteBuffer + +import scala.collection.JavaConverters._ + +import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.avro.Schema.Type._ +import org.apache.avro.SchemaBuilder._ +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.generic.GenericData.Fixed + +import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.apache.spark.sql.types._ + +/** + * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice + * versa. + */ +object SchemaConverters { + + class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex) + + case class SchemaType(dataType: DataType, nullable: Boolean) + + /** + * This function takes an avro schema and returns a sql schema. + */ + def toSqlType(avroSchema: Schema): SchemaType = { + avroSchema.getType match { + case INT => SchemaType(IntegerType, nullable = false) + case STRING => SchemaType(StringType, nullable = false) + case BOOLEAN => SchemaType(BooleanType, nullable = false) + case BYTES => SchemaType(BinaryType, nullable = false) + case DOUBLE => SchemaType(DoubleType, nullable = false) + case FLOAT => SchemaType(FloatType, nullable = false) + case LONG => SchemaType(LongType, nullable = false) + case FIXED => SchemaType(BinaryType, nullable = false) + case ENUM => SchemaType(StringType, nullable = false) + + case RECORD => + val fields = avroSchema.getFields.asScala.map { f => + val schemaType = toSqlType(f.schema()) + StructField(f.name, schemaType.dataType, schemaType.nullable) + } + + SchemaType(StructType(fields), nullable = false) + + case ARRAY => + val schemaType = toSqlType(avroSchema.getElementType) + SchemaType( + ArrayType(schemaType.dataType, containsNull = schemaType.nullable), + nullable = false) + + case MAP => + val schemaType = toSqlType(avroSchema.getValueType) + SchemaType( + MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable), + nullable = false) + + case UNION => + if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { + // In case of a union with null, eliminate it and make a recursive call + val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL) + if (remainingUnionTypes.size == 1) { + toSqlType(remainingUnionTypes.head).copy(nullable = true) + } else { + toSqlType(Schema.createUnion(remainingUnionTypes.asJava)).copy(nullable = true) + } + } else avroSchema.getTypes.asScala.map(_.getType) match { + case Seq(t1) => + toSqlType(avroSchema.getTypes.get(0)) + case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => + SchemaType(LongType, nullable = false) + case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => + SchemaType(DoubleType, nullable = false) + case _ => + // Convert complex unions to struct types where field names are member0, member1, etc. + // This is consistent with the behavior when converting between Avro and Parquet. + val fields = avroSchema.getTypes.asScala.zipWithIndex.map { + case (s, i) => + val schemaType = toSqlType(s) + // All fields are nullable because only one of them is set at a time + StructField(s"member$i", schemaType.dataType, nullable = true) + } + + SchemaType(StructType(fields), nullable = false) + } + + case other => throw new IncompatibleSchemaException(s"Unsupported type $other") + } + } + + /** + * This function converts sparkSQL StructType into avro schema. This method uses two other + * converter methods in order to do the conversion. + */ + def convertStructToAvro[T]( + structType: StructType, + schemaBuilder: RecordBuilder[T], + recordNamespace: String): T = { + val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields() + structType.fields.foreach { field => + val newField = fieldsAssembler.name(field.name).`type`() + + if (field.nullable) { + convertFieldTypeToAvro(field.dataType, newField.nullable(), field.name, recordNamespace) + .noDefault + } else { + convertFieldTypeToAvro(field.dataType, newField, field.name, recordNamespace) + .noDefault + } + } + fieldsAssembler.endRecord() + } + + /** + * Returns a converter function to convert row in avro format to GenericRow of catalyst. + * + * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in + * by user. + * @param targetSqlType Target catalyst sql type after the conversion. + * @return returns a converter function to convert row in avro format to GenericRow of catalyst. + */ + private[avro] def createConverterToSQL( + sourceAvroSchema: Schema, + targetSqlType: DataType): AnyRef => AnyRef = { + + def createConverter(avroSchema: Schema, + sqlType: DataType, path: List[String]): AnyRef => AnyRef = { + val avroType = avroSchema.getType + (sqlType, avroType) match { + // Avro strings are in Utf8, so we have to call toString on them + case (StringType, STRING) | (StringType, ENUM) => + (item: AnyRef) => if (item == null) null else item.toString + // Byte arrays are reused by avro, so we have to make a copy of them. + case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) | + (FloatType, FLOAT) | (LongType, LONG) => + identity + case (BinaryType, FIXED) => + (item: AnyRef) => + if (item == null) { + null + } else { + item.asInstanceOf[Fixed].bytes().clone() + } + case (BinaryType, BYTES) => + (item: AnyRef) => + if (item == null) { + null + } else { + val byteBuffer = item.asInstanceOf[ByteBuffer] + val bytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(bytes) + bytes + } + + case (struct: StructType, RECORD) => + val length = struct.fields.length + val converters = new Array[AnyRef => AnyRef](length) + val avroFieldIndexes = new Array[Int](length) + var i = 0 + while (i < length) { + val sqlField = struct.fields(i) + val avroField = avroSchema.getField(sqlField.name) + if (avroField != null) { + val converter = createConverter(avroField.schema(), sqlField.dataType, + path :+ sqlField.name) + converters(i) = converter + avroFieldIndexes(i) = avroField.pos() + } else if (!sqlField.nullable) { + throw new IncompatibleSchemaException( + s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " + + "in Avro schema\n" + + s"Source Avro schema: $sourceAvroSchema.\n" + + s"Target Catalyst type: $targetSqlType") + } + i += 1 + } + + (item: AnyRef) => { + if (item == null) { + null + } else { + val record = item.asInstanceOf[GenericRecord] + + val result = new Array[Any](length) + var i = 0 + while (i < converters.length) { + if (converters(i) != null) { + val converter = converters(i) + result(i) = converter(record.get(avroFieldIndexes(i))) + } + i += 1 + } + new GenericRow(result) + } + } + case (arrayType: ArrayType, ARRAY) => + val elementConverter = createConverter(avroSchema.getElementType, arrayType.elementType, + path) + val allowsNull = arrayType.containsNull + (item: AnyRef) => { + if (item == null) { + null + } else { + item.asInstanceOf[java.lang.Iterable[AnyRef]].asScala.map { element => + if (element == null && !allowsNull) { + throw new RuntimeException(s"Array value at path ${path.mkString(".")} is not " + + "allowed to be null") + } else { + elementConverter(element) + } + } + } + } + case (mapType: MapType, MAP) if mapType.keyType == StringType => + val valueConverter = createConverter(avroSchema.getValueType, mapType.valueType, path) + val allowsNull = mapType.valueContainsNull + (item: AnyRef) => { + if (item == null) { + null + } else { + item.asInstanceOf[java.util.Map[AnyRef, AnyRef]].asScala.map { x => + if (x._2 == null && !allowsNull) { + throw new RuntimeException(s"Map value at path ${path.mkString(".")} is not " + + "allowed to be null") + } else { + (x._1.toString, valueConverter(x._2)) + } + }.toMap + } + } + case (sqlType, UNION) => + if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { + val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL) + if (remainingUnionTypes.size == 1) { + createConverter(remainingUnionTypes.head, sqlType, path) + } else { + createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path) + } + } else avroSchema.getTypes.asScala.map(_.getType) match { + case Seq(t1) => createConverter(avroSchema.getTypes.get(0), sqlType, path) + case Seq(a, b) if Set(a, b) == Set(INT, LONG) && sqlType == LongType => + (item: AnyRef) => { + item match { + case null => null + case l: java.lang.Long => l + case i: java.lang.Integer => new java.lang.Long(i.longValue()) + } + } + case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && sqlType == DoubleType => + (item: AnyRef) => { + item match { + case null => null + case d: java.lang.Double => d + case f: java.lang.Float => new java.lang.Double(f.doubleValue()) + } + } + case other => + sqlType match { + case t: StructType if t.fields.length == avroSchema.getTypes.size => + val fieldConverters = t.fields.zip(avroSchema.getTypes.asScala).map { + case (field, schema) => + createConverter(schema, field.dataType, path :+ field.name) + } + + (item: AnyRef) => if (item == null) { + null + } else { + val i = GenericData.get().resolveUnion(avroSchema, item) + val converted = new Array[Any](fieldConverters.length) + converted(i) = fieldConverters(i)(item) + new GenericRow(converted) + } + case _ => throw new IncompatibleSchemaException( + s"Cannot convert Avro schema to catalyst type because schema at path " + + s"${path.mkString(".")} is not compatible " + + s"(avroType = $other, sqlType = $sqlType). \n" + + s"Source Avro schema: $sourceAvroSchema.\n" + + s"Target Catalyst type: $targetSqlType") + } + } + case (left, right) => + throw new IncompatibleSchemaException( + s"Cannot convert Avro schema to catalyst type because schema at path " + + s"${path.mkString(".")} is not compatible (avroType = $left, sqlType = $right). \n" + + s"Source Avro schema: $sourceAvroSchema.\n" + + s"Target Catalyst type: $targetSqlType") + } + } + createConverter(sourceAvroSchema, targetSqlType, List.empty[String]) + } + + /** + * This function is used to convert some sparkSQL type to avro type. Note that this function won't + * be used to construct fields of avro record (convertFieldTypeToAvro is used for that). + */ + private def convertTypeToAvro[T]( + dataType: DataType, + schemaBuilder: BaseTypeBuilder[T], + structName: String, + recordNamespace: String): T = { + dataType match { + case ByteType => schemaBuilder.intType() + case ShortType => schemaBuilder.intType() + case IntegerType => schemaBuilder.intType() + case LongType => schemaBuilder.longType() + case FloatType => schemaBuilder.floatType() + case DoubleType => schemaBuilder.doubleType() + case _: DecimalType => schemaBuilder.stringType() + case StringType => schemaBuilder.stringType() + case BinaryType => schemaBuilder.bytesType() + case BooleanType => schemaBuilder.booleanType() + case TimestampType => schemaBuilder.longType() + + case ArrayType(elementType, _) => + val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull) + val elementSchema = convertTypeToAvro(elementType, builder, structName, recordNamespace) + schemaBuilder.array().items(elementSchema) + + case MapType(StringType, valueType, _) => + val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull) + val valueSchema = convertTypeToAvro(valueType, builder, structName, recordNamespace) + schemaBuilder.map().values(valueSchema) + + case structType: StructType => + convertStructToAvro( + structType, + schemaBuilder.record(structName).namespace(recordNamespace), + recordNamespace) + + case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.") + } + } + + /** + * This function is used to construct fields of the avro record, where schema of the field is + * specified by avro representation of dataType. Since builders for record fields are different + * from those for everything else, we have to use a separate method. + */ + private def convertFieldTypeToAvro[T]( + dataType: DataType, + newFieldBuilder: BaseFieldTypeBuilder[T], + structName: String, + recordNamespace: String): FieldDefault[T, _] = { + dataType match { + case ByteType => newFieldBuilder.intType() + case ShortType => newFieldBuilder.intType() + case IntegerType => newFieldBuilder.intType() + case LongType => newFieldBuilder.longType() + case FloatType => newFieldBuilder.floatType() + case DoubleType => newFieldBuilder.doubleType() + case _: DecimalType => newFieldBuilder.stringType() + case StringType => newFieldBuilder.stringType() + case BinaryType => newFieldBuilder.bytesType() + case BooleanType => newFieldBuilder.booleanType() + case TimestampType => newFieldBuilder.longType() + + case ArrayType(elementType, _) => + val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull) + val elementSchema = convertTypeToAvro(elementType, builder, structName, recordNamespace) + newFieldBuilder.array().items(elementSchema) + + case MapType(StringType, valueType, _) => + val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull) + val valueSchema = convertTypeToAvro(valueType, builder, structName, recordNamespace) + newFieldBuilder.map().values(valueSchema) + + case structType: StructType => + convertStructToAvro( + structType, + newFieldBuilder.record(structName).namespace(recordNamespace), + recordNamespace) + + case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.") + } + } + + private def getSchemaBuilder(isNullable: Boolean): BaseTypeBuilder[Schema] = { + if (isNullable) { + SchemaBuilder.builder().nullable() + } else { + SchemaBuilder.builder() + } + } +} diff --git a/external/avro/src/main/scala/com/databricks/spark/avro/package.scala b/external/avro/src/main/scala/com/databricks/spark/avro/package.scala new file mode 100755 index 0000000000000..e83e93efff1e4 --- /dev/null +++ b/external/avro/src/main/scala/com/databricks/spark/avro/package.scala @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.databricks.spark + +import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter} + +package object avro { + /** + * Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using + * the DataFileWriter + */ + implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) { + def avro: String => Unit = writer.format("com.databricks.spark.avro").save + } + + /** + * Adds a method, `avro`, to DataFrameReader that allows you to read avro files using + * the DataFileReade + */ + implicit class AvroDataFrameReader(reader: DataFrameReader) { + def avro: String => DataFrame = reader.format("com.databricks.spark.avro").load + } +} diff --git a/external/avro/src/test/resources/benchmarkSchema.avsc b/external/avro/src/test/resources/benchmarkSchema.avsc new file mode 100755 index 0000000000000..f945885b9f3c5 --- /dev/null +++ b/external/avro/src/test/resources/benchmarkSchema.avsc @@ -0,0 +1,35 @@ +{ + "type" : "record", + "name" : "test_schema", + "fields" : [{ + "name" : "string", + "type" : "string", + "doc" : "Meaningless string of characters" + }, { + "name" : "simple_map", + "type" : {"type": "map", "values": "int"} + }, { + "name" : "union_int_long_null", + "type" : ["int", "long", "null"] + }, { + "name" : "union_float_double", + "type" : ["float", "double"] + }, { + "name": "inner_record", + "type": { + "type": "record", + "name": "inner_record", + "aliases": ["RecordAlias"], + "fields" : [{ + "name": "value_field", + "type": "string" + }] + } + }, { + "name": "array_of_boolean", + "type": {"type": "array", "items": "boolean"} + }, { + "name": "bytes", + "type": "bytes" + }] +} diff --git a/external/avro/src/test/resources/episodes.avro b/external/avro/src/test/resources/episodes.avro new file mode 100755 index 0000000000000000000000000000000000000000..58a028ce19e6a1e964465dba8f9fbf5c84b3c3e5 GIT binary patch literal 597 zcmZ`$$w~u35Ov8x#Dj=L5s_jL1qmVRM7@a%A}%2+f)b^isW@$Vx`*!0$Pn`jp8Nt& zeudv=PZl@uSoPMfKD&P$pU7gYWL|p#h4`N7Iwpz8*>)6pQu$8K5g4X3MNCVd^l+mi z^wPBcH1bcl6SZw%Sr`PO_y|f#X$L9-g z;dAr)w);_-ea$!*mc7p@CSd|NlpVELhMh<;4y8h|knQ6Gw{;CytVP*k1x_$Y;bL~} zP%34!WeX0_W;dkQhBBN}WGK8R1;wpeZEAH#z@;EmCg2I|28{bqD#NLaM@Nh~YM*GtY%NloTUNlnX1EJ+mu3l%44q~<0r;;U9FsVqoUvQjEaP0lY$ zQPNS$OUwoFOHzwV;vuSlf@ztlIVr_TR?*ck`QnnI%)E4<6jVQ)pOT*p)b5*_m4 zNi8l`fJiCirzs?7Bo-wmm!uXIE7j^CjLOU{$VrXQO)P*L2X_`kuq-jBG! zQZ16!g>a~NE!5`q4)_ z4$liO^cG152#C1yMoOBpsIolT{kr(v`xw3Q^aHBW4<*kTe|R?Ed|m!p0};Vgja0tv z^0NQSvV@mTJX+?d_R`?juLpDYZ?l{K!|lO@#j3M9R!GPRCFkb&$j^M&lzz*=zqXM# zsNrZ`5s?Tm34X+aF~ge-=lK+ z2L&D(4&@8abtV@!zo-l}lk%Cm$yRMq@$>t&Ket}CzUHQT%Vx^AYX|R6cVK-rRd8j` zOTR5yPp$nu6^}@m>DI|rUwOUhz@FC}U7Lg=b6d_{+`1-vk5BKc(#2Doxw0d#H~rkS z>B_}r%bU$JUshk+u`iG#hrO-m)Zq|*ZMK;OR@s{;vvnSzoJ1dQtC(q4u@wlk?es!+mtYw`gb3HR! zlFhy~`RA@k4N1A}alNE7Cs<_Xv%EKZvz-q_I6ZH2t+4v}K}xeex=j9SE@xNu_tbCuxjBt5tMljYE-QGu oc=J`GqRT&@xU6vJWtjc`@x7({=kF?=`0vk0?&b4-%A@BG085M0ApigX literal 0 HcmV?d00001 diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro new file mode 100755 index 0000000000000000000000000000000000000000..1ca623a07dcf36e222f502144212cf90a03bec5a GIT binary patch literal 2313 zcma)6c~}$o78OK9KqLrC6%`^;sR3jW5o8G>$QDr67$6ME1QN1fl3_6fWhtway&}sC zqGey|M%V-`OGT*CqEP|G9l;{{fM{5x3D7)RzxRFpYv!JN&$;I}b7ouv$xGDGSj^bQ*4{gQGzd zio_TezFaQT{8pG*Vu>u`D0EuTKY3#7NVvEox-5!(%_UOk01HQ;LxB`<#mYrkR4+GH z@`$7ekYFU4l^k*r7cW|Ro02gm>6Ga08m&C+V$bm3Nr=amBn($dfHa8uwZJmEY{4BO zi5~znk{U>-h@%8|cSG48aTj4nkD!iwh;M8iP%f@$Tk-8-XHOeI+;3LN-L#S&Qr(v8 zR1Vs!C4_U?3{(GFDznj7FR>IAswtO}q?ux;ocxU`<5|P@s;3j-Fq_a4s?zNs2m&^cy=$fyhTFzEv6sTZLfgWDR|cL3 zG48Ci+FwrC{Ujyx8e!}X0qtQ{9fVHofs%mruJk~Cy=($!pO$L~M^6y-3peruZ^V-I zw)e(IuGNj1%!0zWRJDzEaxEYD^Rq#l&4iY>^;;+FxEgx+-<}*%nKE~^`pgH+{qJ7a z`JDgqOP2H2H36CF{kvH-h4L^*imYQQpGa=kgX>RzY~PTvTV}g?)Nqukp{k&H zUSZ68`1Bv+_HDxF>0jmsXUC(Ouh)Ef;~BkB^dYTBKVu3@4|!WxMo92X$J& zd#0*t56R;WHs&Fmd(herUdpB(8xyD&u5BM*j%kxpT7@Df45&r-lEf zUmcIIag=_re-@+o2X0{fKslPOj;FlFoi^{B#?X0T*?VXlXc{Xn2b9{GXJTJ@koN6q1l*b<2dWF z&b(bO@}jc5qht1&+atDh$F~$YLf_?CD|_rnA4F+Gm958047*C9sP2sZ>sj&Plf)R~ zf~uQ0vd;Z}5_#rI*uBS1cfVaN%!8d9bO2DJSDG-MzrF9$pE=)=pY1FN;JKo=ur)JW zl6I!s#sW##8r#CEYcjp|wolmy`Ka4_Q~|g88%8bc159;moWCVxs05qXURB{>A6(6C zwqf%t-}W7GkP50S+g&nn*H+<{0NTZ)^r@K~SYsT&$*3GNh}h=lqkI;R@VY;$*x_8d zn#;#4zS{}W5-%htpjt|@RO@AZ?v-OrJPAKwe|FKiWU|XTR15DFKe@m4eghWvhWr}K z3-*Z;I5#4Y@!*CLwBS_((o#BRzjsd9(`q#1WRZ@dLRL~CK@M{df7zo(*LRB8ogH^A z|M}DW^=MLcOjlq28#O7Kx_+_UC!e!IOX@pWIp5CL&W?^pfAD4**!%O<`k{G(A)>U3 zv7yT~AF1PHq}>>zPPiFc-lUlC>2Yy7_}K|RcVWH%L-{dMTW>usaw3&(F>|>t`b7no l=Dn%ucfNM=JvwJ!aUC~mGJJtO{<_X0i7T92koxBj^iS{0*{lEn literal 0 HcmV?d00001 diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro new file mode 100755 index 0000000000000000000000000000000000000000..a12e9459e7461cc6a7d6bcdda9f6a5b76a6a840a GIT binary patch literal 1621 zcmeZI%3@>@Nh~YM*GtY%NloTUNlnX1EJ+mu3l%44q~<0r;;U9FsVqoUvQjEaP0lY$ zQPNS$OUwoFOHzwV;vuSlf@ztlIVr_TR?*ck`QnnI%)E4<6jVQ)pOT*p)b5*_m4 zNi8l`fJiCirzs?7Bo-wmm!uXIE7j^CjLOU{$VrXQO)P*L2X_`kuq-jBG! zQZ16!g>a~NE!5`h%K!X7&ekGnQ^-`@$(Y$#!+Dt3>Pb zy#Dsb4-0n0@I4gL6aT{hgPliQ``{nW^tAJ5=Jnggzpsg6bDbix@=|7Ms$$m!B_&S< zPO)1V4zm|1MpSO(cG*(!`x^JXt@F0}?a?nje5>BNcy1!2z{?pm8AmZ?*^C7Oak59|jSH8WVHPuDp(v{xm z^ZV_0J<48xdUa$};qTe}b+;CuU0D?S-QroGw)GtUzDq0%y5=m|^lU;k=i;`DNhOOI zCS6>2<)P2nmUga&%}QnWF3GVjXnq*coIY#DS3~u6W^O-4J&Mhyb>@87xl*vF>4A>6 z;gVRh$=N^GJX*vmZ2Ux_diSfEXFAUp6>j94`^|9Ef?D5`hc=|zT-x$1Jxkr9Yr$7KlU}z^&QOkJ+VyjO zfBwI%Zm(+(&P=-#{-00zX;e~XcUo8Z)5tkeh5?INo<5lp<29$Rhh@?NPN~W^ug=Ri z&Ig2YaNZ6$mLOg@`H;bKai&|#7fd=Wc*5scmQ6{b_t{uUZAIxvF^he*AJ0`y$uzP$ z=<%v0if^LzQOObmDe0&=T8zw}FNlbA+$(7Qtnur`VT<=NeO4EY3zF1S{PZL;p1QCL zNqRe6Y%I{OUhZ?^>c;fqAUnV3-KLRVmm@slvSPT+@yvSI%xANEBpHJ)26C(hAypM@Nh~YM*GtY%NloTUNlnX1EJ+mu3l%44q~<0r;;U9FsVqoUvQjEaP0lY$ zQPNS$OUwoFOHzwV;vuSlf@ztlIVr_TR?*ck`QnnI%)E4<6jVQ)pOT*p)b5*_m4 zNi8l`fJiCirzs?7Bo-wmm!uXIE7j^CjLOU{$VrXQO)P*L2X_`kuq-jBG! zQZ16!g>a~NE!5`` ztqlL{$Gh|rBb_3`Hg4j)#i_LHx}Pqu(f8Z--*<;{bZ}@bGgx+Ufr1e0Q4zMzM@~H} z6mCS^(2USvJorFW!e>re+rw15eU;CD-t%3Z8MVJSe2!>)fV+qDM9ZWVJ|}&{*t0KL zCAyVd2??9w+J8nhb!)Rs#{J@T77Fq%3+7cH3q2|5bf5dwy5}lst&U2uO7FXL!Dsbc9OlY=GA|Eh8uoJ?^bGE~;LD?+gmPv z^Afklj;Y@Id;ULr<~YHlz?MffGATuW&##9+m91?xq*|EfN2 ze{a6*YvJ38eZ2>kO`IQ|9mag^_?eguPlG0|I%;(+=Eu4v#fAWky|n zWA^>n*&M^Iy4{N_!!+C`{nYC_I4eN;>~5R%vyTci^@Hz)NNtHd(6srK!%VZ20n6>h zuYK4#^OcLa(}7p4+b-@pvi5_=1c?O+Q-63geYn=)rlKMfw6JsfDM9TIIy0oU&g3=X z2=&RG`(w)7SF!=ux;YPvyuQsKP;f@)<2Loll3^!;&UCY1(Yn4N@Zz_O&WTQGVn&X7 zA+~%|PrC>PFdS&Hbd3~AE)YAtv_HFN=f&Sl-l3%#E*Uys%brXup17=a-n1)zmy>p% zO}b{MyU*_X%l`bD-#3qHOLt$6|5|L$cj#oiUrg<LZo|eU=EAeREYq`*S9BVc`tu@)c;q=kZ`FDQ4YZt!mU-#|3 zaes~B+8E8#y}d6V?A!eHslC}5@qF8Y&sVz7*BY&z9wz_q&+p0XXA>nl7AU4N%(D7? z>TC4rTyqC`+dDshe0sZf?WO9pZ?jjYufMnXd6vk^k2#4ypFeu}{=FPS6tB-aU;fo1 zk#85@*&1w^D#FlynbE83;(`t!D z$8FCF_A1Wx=FYTwC-Qt}vryy1vaQ+zT&fQKUl_$@zBhKO?+88eU5Qz2lF>fh^+8wN zpGu^y5mt>BG)up{rPXHs*FCqK44%l9cln92dO)UCiOTjZJ)!# zdt!>(tQo0WMYgR9ZkQlt@VrUm^NnnSM=Km+7Yc7YbE*7dW!^&9DcXq(l2%uDp z++Dd_D#<1NvYnRlG>N1*rgu+^Zf#T$*G;^8`iP*;W`__NW@Y!BCRw+OY>B5|e)!Ye sU-|#a()9MT&QI2Uj%avsbZwiRZ2jBLE&b=_+WdL>P4nM}|LBDe08rC;u>b%7 literal 0 HcmV?d00001 diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro new file mode 100755 index 0000000000000000000000000000000000000000..af56dfc8083dc409b2afb5a1970897b148dd8ef8 GIT binary patch literal 3282 zcma)8c{o)2A8tX}B}>+H2s5agJ#raa$QF{dF~(q+1+zqqEh9@&mTXhmqAa%<`;wH9 ztR>7KSrXYol9aCP`nmT$zvsDso%4A=pZEQ|-*evcJjcx0V=n^_jOc?s0mr%^;2bUp zR}>77;M;_7aCZb6mdW7{;QhQ1fEwU~fMb1J09gP7LvPf01P%|~npy{4kqDFv4p6iA z|ErI~`yerH>#c46PVIt)uhUHsFwA-%g}~v&wpwCXS24Id%m)U?BYbcGN%ntINVFFU z0Y$^ScI0+!ZGl7>ihx*O4^93W{b6M5sJcM-2Tbiur3R1bk;AMBLC*PqOdSL)CEg$Mj>{s+=SQb?Z$4N zE7BL?qOdc&bI476W6{>z+!@0mudKgah24kU?8*N(iH)%>3HjX;2n+%JZ-H%e+kzzl zx$yvSerRVbY9mU3r8z`b&vG3z(1Tc5ZQSS@LQKp|bvJHxj4x_l`cayw_; z!M8W<)c;QHW{{wp`1+2&N0;O%3&2oF7;YnJ_JGCa2Lagt{%!;2;{)@9VqKxmSS)Hi z<~u?=hBhbGVI&@b-bn3VgS!vo?6>}f?NEWOlH^F*lTeZF%RO0TDkohJExBeo$22Q0 zl4KH*eUUf34%w1}4$lz?haGH_o&eM#q3=D)W_bp{!)w1NtG|L6k1>d|KpY}YgmSY% z^mM%r#fyPYD2ApVbV1N zM=(9`jMMem)3t{*L@o~M6}#3{G-dAXFDFRZ%{E#j@h3%_yu%0jokNEh>0vjs|p4kG1LJS-Z#&G6`F>(?lxsb z3hzsbiv$yjm++3_Zz$uWws32Cd?2p}&@fir_)WjZn=*K?iG1o-K8JA^RV8Slk#)## z+!L!o_)6!8ey@){y7F-}8XGYFBG*hMAy~0ms6UwbviE_bP0!70f5r|C`7bYN22H<& zu6>p~*w4x~kPa1TH){`=OuTrv)^|eitr2U0>*|1iNA2g3YV@h;*^ZzuZ|f*WlVZz; z^)I356Y39Ry;nX+6%fPYN|5hM1M2= zJ*Cn?Iq{3uVG%la)_we$VIf@3KHj|%s+y*v?FZudvlRm~;vZd%Nh_TbO8g zFiYNo`VeVS)2)}zMvWI4TAM7tr0%~Kqa>J``}*YTwA4-q&Vh+Rmj#f3bQXh=treiw zKag&hHee9mzU08b1UjDst^3tl&ENE)?so?GjH|FffWZ3--%i;RSF0_khrx{=F*DgD zpyq*=9o=;?2Nh{JXOjFIl{vk$Ybd^_WrdCtxNDQC*epl*K%C9NDZk*RFG7y*K`j535LX#%e4KEyk9 zKT6Q8dLMa7*Uk-LJaZ}%j2SuF1Y3>sdl=@TZ1=MC976F?Loq<1(|EL_<{3#bWrgn< z-`%=tPDY2yS+nwLc$u?@ z?Y=SzMt)N@D^9{W^vsITYmh74B=x8RS6!Fg$zgU!i!5igB$B9e%0V&aFC4!RHSn|pwHbP#9j2Hc76&kHGeL++bQ)1rU2nY^tno!u{8!g9NS z(ieF?F^eOy=Xj`ziEYUQTg;IlbX?gP%06q$s+)SrnD@olZj*G0NP$+058z9>JS`NILDeS0{+s z5OHrUQd+3Rfl3Q`>vrWlR!}rHlLnv+R!u18j2^Aloyg2?^CY6yTNy?U;j~c6G&&gaK7Ufe^U|}^4>z%L-A*$Qq z&tn$RW;|R?aTl8Ne;(aa#*tAJ(sFFSdd{Jd=(92AX_1}2{x`IjD#$7iDFkFUxIF%8 zb|qNPNm8ktn|SOrg1KMeh6CJGUfr!Jm_kMukItUvvvR5;#JUCuH8awjK{jHQ0xax; zRGT)RrxME7ucR1f)UpYKl+yJ!}mD*ZV;hGATFX9RGOBA}=Kf0%>;+zwY5^ z07=<1*~?XV@wb8!FBB@985DHyJ>mft?8$fx`*1?H;u1|u+CVtrONnrCH^<4v#+YL- z9(XURX_vQhD(f`0S2Iz$_$~{*%d4czmq>cHg=Ja^`x_YZ(diFFp!zA!dxtY6`SbIP zc}G;(OnKooQVycgqRaxQRTd$(Sz2|y8t=0ahX z(@d9=qd390A50A{l+m4w3Y-faJtLT}UujXp5}5~hW-`0@$F#%xI-*7g^zowr6WCN2 zxTkTep%qQ^I#qQAnb8*DAZETytBf1v`85Ci&h;)u-=QNs5}5Jn_EvwZy=DOmHmU_T z)>R^C~s-Zv7@T0(NO9YVvvz%4PQg$;fPVlv}byI z;qo~C>W3n_dMA1R)c}b|Db-&6koZOCmAH{G$Rk@1irJ@FRz8HL$nr}YrPAUIO@Nh~YM*GtY%NloTUNlnX1EJ+mu3l%44q~<0r;;U9FsVqoUvQjEaP0lY$ zQPNS$OUwoFOHzwV;vuSlf@ztlIVr_TR?*ck`QnnI%)E4<6jVQ)pOT*p)b5*_m4 zNi8l`fJiCirzs?7Bo-wmm!uXIE7j^CjLOU{$VrXQO)P*L2X_`kuq-jBG! zQZ16!g>a~NE!5`9vH?HLE}M8j+mM~2qt zX0sn#oOgD%;j>w{al?d+L}f=Fj$izL7&jYRTU!J1W+3WTSk$V~G;6_Bj(K5@!k%27 zi!-l?m^C%+JYgzm+IlEa`PiZAug%~0et%V{G27QqcKU4Z!;fpFGNpD2Wu?c=Q`;^7 zZ(eSSf&1(4aWV6=1eIet*74cf=X_P|VVbQTb8w2=kv%sfWlbEJ8RVDNO@4GJ#CTo9 zwLo1LcXe0Q3K@siU&|i#6?JPj6))6Tv3w?zZScu0Itu(LZc{(q)@n~ojWPE(dL!X- zCG}4D5qD08xVe!J*S@vex%Fq+>(|?*ENWGwB5G{s*)d7+g`QdMYvU^1Gq3*EysTrc zPrD8t`{#e3|K`8m-`QXHm55}v`L-FDOyPBxUi?x{TyYtC+S2pm2J^T&^O`*+zpuveu95jy_fFv5UCHL39j$%>vqVyL zCIwu+acT$G5=S1@6^vQJNA9gZx>MfW{?uM>asL?a{aYrUygcJrT=(>8Gao#=>;8JO ztL>ZZbCeQYZU;W~kel46crI!6qb_5HH18bUl0+$v2Z8x&;yoK$rtb;S(byUGZ;#Cb-g$A~!u#Lqyg4T?Gymf*vjEL0R{SL?whX)ee>nMct@rY} N^K;AJ|7S)|MF7-8R#E@} literal 0 HcmV?d00001 diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro new file mode 100755 index 0000000000000000000000000000000000000000..c326fc434bf181b897372b8ab750a03320145617 GIT binary patch literal 1729 zcmeZI%3@>@Nh~YM*GtY%NloTUNlnX1EJ+mu3l%44q~<0r;;U9FsVqoUvQjEaP0lY$ zQPNS$OUwoFOHzwV;vuSlf@ztlIVr_TR?*ck`QnnI%)E4<6jVQ)pOT*p)b5*_m4 zNi8l`fJiCirzs?7Bo-wmm!uXIE7j^CjLOU{$VrXQO)P*L2X_`kuq-jBG! zQZ16!g>a~NE!5`=-%k3&)dh9=mr>?d2A)g-gYZ?w7wiGk50y8vm7z8cPISTrNj;3TzQOqqIcu znv!?Nq*MAL%vq|Qf)}5!tUvm&+^_$-_{@CI8SU@y{{4N^nd` zmSerb>Q9Bc70y@mHFRFMd(3T$l8bd-ny}d006C$x?Lw;gGB3?0iE0U4xtHg7I>vTt zfQd}@UYGNVXMBG}?OtbJuqbfPyY1iYYabttE?*Z@{Wb7UnZ+rco1SOvdRFIcebrVz zZ~umi)fZMI{YsXc!@&4^SL=db=f7U|*DHB^!=Uu_+_Hlvheg+&X`SnyI5WC!P1+&7 zG!~BOkKJqo^~_r=9XDBLB+kn%U-4F^XwI*9=elM;^SXFh<=}0WHLF~gL{00OI&s<( z%Z)44TvkrK_DHGtLACMJC#t?K!Ru!6B`WHr^oKGuO9o7>Rb6J7KDY5&?@3{!Kbwpu zGfd$L6WrPMI463`wBOSTkIuRkvs^;pT6chI_Ob~NTa{kvOj~(L!+0i(kmz*J6Gz{0 zPF~YJSumt>k>lh}f6qxdrjN|p1bFOL+K7fSKQ(N%xcYq7qg&0<3^#UU#PfB`sjxDM z;CA!ee!l;C`uz33j@$p$tgHU}TKhil^ta-NJ(HQX9yN*8<`w!Cx_$4*!+$rk+uygY z`*J+{x_(-CW?SKdguJ@{2j8Z-WSjSo3%aTtlTa_o^(SKZXLO`ivT-3Y|Y58ndZ=RUUWq4-t5c=uUbB|wL}zkT9Jmp(#Q#&!pE{9^2`x#2|7Y*{ z`{1wo>vvD9&Gq(|{`)y2hMU%h<)pAkLb0|2== B!hHY$ literal 0 HcmV?d00001 diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro new file mode 100755 index 0000000000000000000000000000000000000000..279f36c317eb8763be6cd3202a88d852550d381e GIT binary patch literal 1897 zcmeZI%3@>@Nh~YM*GtY%NloTUNlnX1EJ+mu3l%44q~<0r;;U9FsVqoUvQjEaP0lY$ zQPNS$OUwoFOHzwV;vuSlf@ztlIVr_TR?*ck`QnnI%)E4<6jVQ)pOT*p)b5*_m4 zNi8l`fJiCirzs?7Bo-wmm!uXIE7j^CjLOU{$VrXQO)P*L2X_`kuq-jBG! zQZ16!g>a~NE!5`p)McTZ*Uy6Uy$j-bjwwT zC4VZ5B+o4$)x3ziS=T z%0fzRdy1_m*D1!@iy?>OCM{d_Ln5h(Gcr~)?-Y|&l*cxgrb7Zo+$k+VY=-C9^hGyM zFg;T7B;cjN+GVp+kVq}^)JF+3&0 zEF~(+Uvg4GO4l}d<(9JtO?=JsYjxwRzPUF)xS5xEz9({W64$1safVFi#X1I5-@oD>yCf!!48l1 zIj1K07SG+js-Z^0X7PQS=GiaR(qxV90YLNAAaN%zu`ce5G07_M-b5on|{uzbrH6y(lMS(DC^zZ-fGaFl%SX zhP3t5r#=bMd=^mXymHy8o^wL$d?u^x7MXSW>aDMn)Zcvl{_f@J&;0*ue_cEJWe+pY Rj1d0to!27ue*Q<#pa29S0zLo$ literal 0 HcmV?d00001 diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro new file mode 100755 index 0000000000000000000000000000000000000000..8d70f5d1274d4f68f9e6cef0fdb20c8857b81e52 GIT binary patch literal 3420 zcma)8c{r4N8!lTSvXc;#o$SjbyCHiW%!IL=F*6vA8O@BHD6&OE23eAwX)MW>wFMPf zQixDSwjx57sC>5LbI!TG>-zqB-{*dw`+n}<``+($y%$_O4%2bLd~jGfI2M6~vm%hL zXcz&>vjgGb?nqBq4r?HQ;O~V5XaI0XI2MNh$O14h&+VFk#1ou%r?x;>6cUZV12mw4 zfA#SM917#M)!O6l*9a_pi*A90VYd2cBpxrk+Y-jQ3d7xDI2fFO#Nh$Kvj3Grd3vFd z&Ym!@eYt&GyPyvY?Ty@84?tlEfZzjEICh)r_y2JJm*k%D;DfT>7!(%cyxVccc%#w# z#-M<`a*yVCz|1ad%c3(Hi*frWZ`-;oV7s$**%A3S*AXG7G`XyXJxZ}12WOGxL^s|zBMoh>stdH4{q6y=x*fp z&$^HAZQ8H@o!XrsIq%?GJN_QsU`JU142^=}x0411SnYfedfUL?ZNP9in7=dD)!7A$ zMQ_D?UufUZ&cqgsA|O4tQ~TH8!Na)tZ+&6=RKRW->==9~enaSj7=xHT?IHYe$!MA* z4A;d4Q_PC)x^YLWo<$R1sD!iWM`hE|u?qsZ+{gf+)s$uU{8i}3U)sf4iI!;9B>k$Q zC@@!pn5`ZCBOaXCEt5WV0lV z_m>(*S>b(ACz9~|4EdA?fiO`+M-lEiO=$WbU!M=>iLwtKLy30!WfVl8X`p)M4F@PB z5X5c{8qAOy?JSH&lIA)Y&j)36*zzydnVe9EBoClELNt5so0}23g9gF$pEXR%R7;+N zMR;ub8<3Qnv+l8#mi5;?(w558a!l?dgUivku6;_luTXcIw(FA|&b0`{!HCOyUp%om z`M$3*b!__bNOf&^6%wW$H84pFr6-GwR~o!ru$eWmcU6C+g?EEy~}kUm7g?Ieqfetnut%TU#6TCv$hm*xg*pl-pW- z&F|&z4GHy;SOa`xUTRjt7|tHkmv+ImBJ zjc7^NeqS=w#__gb0nt{o-2P4UNue1gaQZuyvi@?AG1MV4>%`DGSzqAS$R#>8fm4Q3 z^oj^}RIemvkz~ym)An%9+p5?swxi${SU?U{PcPsVF8_GBe#3WO$8U++Mx8meF$yOJ zkLI4jzMus0gUKQ(tiYyb>N>Oh@0G}VwLd;j&Hd7hAL)#-5;u^Y5shX?AC>N&^DSxc z$iBR)$!P$&#&r{-8>|z;Wg1^nqYbUrXj&WZcG9Uelwu>q_(0{Ceo}~!35!2Cp*G_= z(vcqBh?5WE(mJesuZFjqk_20;rFi?Z@ZlC+i6`oYn^iExltIdpcWS9h3q7Vi^S)%o zln)>HhxW5fEmlEuF)b!SamvF_Lbm)5$KFZKt2~{Wn08Gsi#gru7_Yl5i-Gd@RExb8hAW zI6U3my&m?#;P)L{9cGQ}-Z*>%8R7)iOr@iXW0#uTnEd+fuTW6O+wdZW@g_`$eXHU{ zRr!Zf9TY{AXa{|cr@m207J@I6_1khiiRwpa9gd3odl{*t8knnq5~#0Sbkf#Kh|M2uR`e@QLj^u*D2FmY0g^KRV`VPT<9wjXkTXkqjkN<>*?CrM%l zoNf)}wX{Qw+u}G9flXPL$!Uu05}tRnj7ZbA_H96nhe!VMooTK*`fM1-8$ zr4(ux_Y@S1M#ZGskWv{gQs@m@Z97aY zj|F(;Jr9;RyC9U3^Nek}VM%pqARM3S@+V2Aqj2C7`*f2aF1MYzSR?GLGEX9V2A*XV z2OC#94`J-I8cAlCm<%d6N>IyYBW|p={i&*tuhy#J(X%=qenCj&D~IzdR5IArHt)XE zfW7{wVZELk%KYuDrEzEZgd(J9t-_2PCYYq?o?= zJ~ha2kF?m^Tia4ip)2O4nSa#$fiHf@g15pWr*4xu^d=Be)YT{V7Ib~Zg?2r^8BxKe zW}6ULUktoD%H74s<^gg`mo{>5hER_xc|q!eKh_$BiMiJ00>721xx!{4IgY$v3>MN7lsf|&_OA>L^zU`pYW4K&QvNhlGwwUJ1b+8Se7JGooWR` zze;p|NIt?TACv5*ToB&b5yMr%dw($VkBLz5gUMD(8O!Y?_@ka{!ZVYEQkxdg`4$e* zX`3)M1fgjSf~-R^URMYc6ix`8YvcK$sKTT(cBoQ_Z|Rmj4Png{+!C9e6+ud<=p-Pj z<)Y`WmEPh{U`3jlIEM?5hVJ}1W_qWA#CH0^KvO-;`ygGSowP%G&>A{UhYUHDu~ z#)h~ZIeTqXA%;S)nb|zu?7yg?dL=YZjk%&1%+%&w`DeR|s-{L^V=KDl4%aOtDxwa2&!3I80O~1fV4P{#T7`uW9tukJ<)!mMVKOEPqz% zx1}WX?$wQ|!rvQjPyPI{a6zbPEk$BJXu9!Uz-A=zF>r)WlPudm%s=k(`F2`oJ;^F; zCL17RB4r!5VNB6D8Ykz_uKNPh!ZRnwhFBb{r6PNXAYi1h=l-zgV)8mpjYf{OTSzMWC@EdPR{12J8cZteE9v6^JDE&pZw@Nh~YM*GtY%NloTUNlnX1EJ+mu3l%44q~<0r;;U9FsVqoUvQjEaP0lY$ zQPNS$OUwoFOHzwV;vuSlf@ztlIVr_TR?*ck`QnnI%)E4<6jVQ)pOT*p)b5*_m4 zNi8l`fJiCirzs?7Bo-wmm!uXIE7j^CjLOU{$VrXQO)P*L2X_`kuq-jBG! zQZ16!g>a~NE!5`22aQTmZy8)zx%yUgv*mfR9Hz#OKZgr5jX2zt|(rq?gT}Z zscnrc+y`eQEL2+Yy=1xN{;zjtW~*8(zc9;Sm+}JH%!YOW?ea*4tr15=4oo~^^-axX zIuB!i;L2&2UKDocZ0t!t*%9nmIJ@iP#T5cJ+x0rn6$Nu`2(*6ed{Ss(SV zF5}$oRomS9v{vkL*TJ7(m=cZ7XkFJxW!HF}mtfLx)8p$GhGTbb9LR|Dmli7#++3+q zJdZuD;08zbS2iW5S9jN{>YH9zGpS%x#tj#~O&{_eDrKL%b@bepgM8m+74Zrr9O!B2 z<-4xTuN1jZJfQfJt9ZuD1dD{nEMd01o7+uNQ+*=&-WQsS>aEJ~uDKH$P?~;g&azCq z45!x~i?^H$V1F*Zul9Ghw)pvfJ7%4;{c+^s-sR_V{nqg*#})&n1-&XgeZ6<0OG5Tm zmi+$y)8Wy!l*cQ4&KT5v-kbk!-ksKytWyv9Z@2rAQXTbnwn!+0TgCf}CZ{WJDO@j= z{r65&Cwe-|R*U;Te(ZegEnZjm^UR!ie_m`oF2A$v$Mg2{cAr1&X?_#X`YhAx>$~gE z&)d}2{0$U5nz<@5V{iX4X~pZ+%8ego=M@zG{kpjR+GT5Z{lB}mo}BRE+smh)k4IZy zY-~Gu)xK`$cDWr+iCr--rtyn?x0cU*B;lj~r}pVu;m=#e_xnG&AKgB`{@bdZg-Opo SU)xgoulMfk?f=-(^9}$|N9DQz literal 0 HcmV?d00001 diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro new file mode 100755 index 0000000000000000000000000000000000000000..aedc7f7e0e61ce84828b3046992b525ec029f473 GIT binary patch literal 3872 zcma)8c{r5)8n*8oMByWhEJKLwq-2Ybi0oq;GqzbQGxnwWvLr^Ih^$4FT^ReGrNP*> z$QnX+V;hoV={U~$u5(@IulIfK=eh6a{=M)0Ue|l;o(nk@2iVI4B?Un_Kp`v+P#7GH zhMqlwJRwd{1UQ|=AAt6Cg920l9#9C%!vSy+fCM8BYc$jo4LY7W0AbEhxPvD^#oGU0 zeNVK9Gt%*(^_zcEJD?y3bVDc@dC-SLJv{}ETLLJU0K^IG0fwNV9-e@}i~p8#M!3PD zAOzU$MDE1aG3W(`W1t7?0nSJ?An+6wf;y!7{6F0PCHdQU;AvS5(iw#W9d|%T3>+n|7!0fsy!1nZ(lZYNi z?&Pc|`0q_8_1{x_6eQ3Qez4;|qZ?>@5delegFO$EW(_bo`XDq8f&a7t_V57vf>1Ef zJroLl5c3nE6GKN62e305ia1Q|zXneq=AQ4t7j{Ag9G8K%fn-TnuFE)AmF!%o7m8_>3Kau5L3y-I=(ED>mm!M$yCdDzskoqW+!#uGPI)j_u_-%sfAL&B z$#FSA>^&A!lC-c@_iJ#U)Hyk5-PlyDn*#YXiK|qv3mT(}e@$4<1PZ0J>2_2#h>>0n z3#^*Ix0K_D+#E*mgW7nt@;o3yECAjB>tZ_HXFLWFGPGc-~9?A&DRI` z)&!}ICJCRb2L;<~6$CECFm=b5Y_ZP_lDfICZgHvg0y0Jkcg7(4)4;k-Y`)s;>_-uQ z(=K5qvviavX(`OQReZ92T|&+-Au2#cy{lb(0dIkl>8=bENNNA|_R_h&C`5u8`d9LJ z^FW0lVrXquTH0#m>(Z)GthVw;zXWminHwTg5ecy(uiz$Cn~9pkj&MPCVtS!TwB?Aa zYtv|Fk)RP)N}sJJQ>xgjm^+xEa+bN}E%qhYC^IHEs<=H_AL3rhhM0esu9d_kbH-it z%}D7mRL~or6i!v^hns4XBQ3z1#j)!nHL;(1FZ|kW*zFTnU8)O$z+D$yL0mF}J=SY# z8yO^ZLLr+%c6$5k9Aa?_9=!efWo&O*UO%Ep{nIGq#oY}4w&6;{v_4hYP@u?@0n7zn zNm{oU?#DgvJ6%P+7%vsoHqp8Cwa53gs$1RGaeduD#VGN9c%F>eQVFLZ&OEnK)RzOl z6=%Ci_z|v|=-LELvB?qX9u^nDMNU20VSU85`UCgG@saDI!h_szXfFA#lrFfrzUWf^EW6rTQqoh? zoM}|GSyAANRvr7G?3fu(K_@%hQ1pjb&j?>&+3YKm5WPZ}m2RqzQnIvfEpO8hFxpzr z#Q?YBRj?d}1JaVLo=N+a#;iRyw>0;(z2>s4?;ddtjI5rpZA9r6#Is5>M@+1kVWdas z^1s~9V_g}jCEOD^Q;DrmB6g;bvCUQHX5?iDwF}tueBb%)n{m~QbqVpPrtJ(nr4is zG&EX%DA=lqUj&YQn~vm`Ode#)_Tk34jdZaF4h+a7N_FcAl{GwxDyPsJip@-O^k}%m zwhH$?XT&ZLoD~iy#@(fK1NsQk+^mo;nJnrF@n$O;E1t(KZ{~R$ZMa}|>4K6S-g>oo z<(e=Fl~boPb}$!-w=HE~h46CWw3XqOuG zF(;?gVH^?aEC{PWS4dK{7}!2ntmAMt@n*f0K~NHD&(FK;eJA7L!jpgWFNLyxpAC#- z*I!tuV(_>x;sN_a!_-;1;Q(@`(N9Ve6)}wB`_@@i>5;?OGK<&Fid!P{T_3lXQWcY# z;d%a`bN8C%xGdIE0PDmw4Be~rFyLxE%Xdi2XTCBj%%8A|4gOZ^i_9s==8yS^n#cwyofuuR4o0My#MX6xa_@nuKc?#R{b}*Q$`?aXO&jh#yoge4L{(TunPb9DL!Ujg7ZofW?}f=js1`+B z+ohgJRC4v?|1=tuLiTHi($g|I=rM6l+Rgw!(Ngp}1I?8A{C%%dvOu_&fK-JKOr>>L zeB-+Z@_0Apcqgo?pMFtyN@*%ZX;{-HE2i$5&^dO%Bw;1}UA%>b_%a4!;3mhj6?47N zYF_HcrfS8wh-mwWSQW{K8r0;D*%2|=r@I_qW-kk%5;B+2Uaa<(?Mabt2y3)+M#!I?v~QX3q^)h${tKE`!kRQY{*prL6bQ^jL`OyJD|du+3pv zpB+8&n_tdt#I|eYPy2$OTiKqh7)fqYq+<5qsEuVyPSDA$|JP|e0rNa zhF#IchM+ECYaGc!jet@O;;!jjr;2Fw9#>~bYP$DDs(R6aj$-q?#Puw(kv>z=WvI?w z|5)Bpu<(YCM}_f3pC~07{?$gGN#z-4ZWak|o7#iE0&!wVh=oz*%$-k1pi>&3Tsk%PPJfP^Q zls5EEbpp}5odC|@%M9yx$)V%Yl(IlY8Y(q?Etk;c@V=bR_GymBpDhbl_5tHvFrK2b z5=sZCp3>~)pwhyZu6~*tXEptKDUjM}FOqJ1C9#+R|HjAuvn@b&UcB32qe*!!?p#@Z zrlxUEqBL#Vj6Pv5!Bqk&RDJ)K1&igwqGaU(e1OGOXE~J}hDoZoBy@U~lFB5;^m>S} zw}PHu3~r4)lPdvc3{%1~r86|$;H+4el~FlAQ95NqcjpJDaH;UQ7W1he*X}jSUZA+CxZ;v6Onj`D2T)KPZylIPP3Mczr+zYMi@o!q zSh5Un+YD~}bl!_J?=HV&78257oWSt8iIsyf_FAcwC_A=TZU&Q^IfzEpeI5zbs-rx^ zuqH$?6f&i)caigHc?f0i{kaz!A3=<5-@ltXaXC_0bTq$#Gs~O*0A7>%}^qFfSY}w-7m)2%x0sPz~40*`PCb*-hn~*N}r}xF%#^p0) z=1D7hDrd3~;jfF9U!&v``FE^zg%t)&-f{f3C1Zq@J6p7oz*7AnG;_F$+&Jb!3)fR@ zO-SNjoTO?fF50rGAdcqepG$K)C3@Yxw?0ODOG+$9N8NGYE?bMgx--4MS3kD>K6t-U zT1}m33fh^PsozKo-kQeTu0MRS3H)>U-ZwQ*Cu?cR@RI@om-1YR0Yml>hx-{ZCxu7Nh_G literal 0 HcmV?d00001 diff --git a/external/avro/src/test/resources/test.avro b/external/avro/src/test/resources/test.avro new file mode 100755 index 0000000000000000000000000000000000000000..6425e2107e3043aa9cfa201e274994def048f4c5 GIT binary patch literal 1365 zcma)5F>ljA6n0E-VF?tWA~B%r2?0YCiG-;z)NLaOGz7 jeHj?Q?U~=SzKdX{aKW zikXR#fsq;UW8gQiGB6R*bpC|b&G z=}#yppBslbolPlT!3p(665u9|30HPXW$G4D0EUc4fy67@hsS=ICM@0oSIO6QAbg&lu;;;S)Af|h3X4MJva~d ze<@4h^J>~GW+HYAkE;$%3){w}S<=Q8F$D`Gx{-)?PV)ib~Oc!Gk!KfiIx(a zjHv^VGwz8d< zwfP{qISw^Wj_!Qi#3W)ws!7|%!+arZ1)P*Yl7!4$5xSlb5sbM`qy^;>0JD^GHMPfq z)n>dIY?!9v!kmxi#-FD*-hE9L8z1j9zCZf;{+IRp;=MutF@n`n(?hRdLvLeft{3yBowhYWHAU_ z0gxgX-F?_fibx!wNybSGTboT;z|z^n9PHiYC>AM_8IXx5vQ(2=R?RRB%U)Z*HKK4h zF=7(+`fK)b-P+sRJE~cnb0sPl*)boO_szDl@3%YVQO$hyMLjoH7Zw(3ruC_|$wG<( z7UcC{Z766;1>$5Egi17}Nl5*)g|;Swf@)Q*#E?hTf=TEO5yUe|Gu|?c+aqYvS9Ay^ lXtUQ{F4TbRx(ToRS;UvGeyA^vy3WXTMnfeIL^`MM<1a>lvsC~9 literal 0 HcmV?d00001 diff --git a/external/avro/src/test/resources/test.avsc b/external/avro/src/test/resources/test.avsc new file mode 100755 index 0000000000000..d7119a01f6aa0 --- /dev/null +++ b/external/avro/src/test/resources/test.avsc @@ -0,0 +1,53 @@ +{ + "type" : "record", + "name" : "test_schema", + "fields" : [{ + "name" : "string", + "type" : "string", + "doc" : "Meaningless string of characters" + }, { + "name" : "simple_map", + "type" : {"type": "map", "values": "int"} + }, { + "name" : "complex_map", + "type" : {"type": "map", "values": {"type": "map", "values": "string"}} + }, { + "name" : "union_string_null", + "type" : ["null", "string"] + }, { + "name" : "union_int_long_null", + "type" : ["int", "long", "null"] + }, { + "name" : "union_float_double", + "type" : ["float", "double"] + }, { + "name": "fixed3", + "type": {"type": "fixed", "size": 3, "name": "fixed3"} + }, { + "name": "fixed2", + "type": {"type": "fixed", "size": 2, "name": "fixed2"} + }, { + "name": "enum", + "type": { "type": "enum", + "name": "Suit", + "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] + } + }, { + "name": "record", + "type": { + "type": "record", + "name": "record", + "aliases": ["RecordAlias"], + "fields" : [{ + "name": "value_field", + "type": "string" + }] + } + }, { + "name": "array_of_boolean", + "type": {"type": "array", "items": "boolean"} + }, { + "name": "bytes", + "type": "bytes" + }] +} diff --git a/external/avro/src/test/resources/test.json b/external/avro/src/test/resources/test.json new file mode 100755 index 0000000000000..780189a92b378 --- /dev/null +++ b/external/avro/src/test/resources/test.json @@ -0,0 +1,42 @@ +{ + "string": "OMG SPARK IS AWESOME", + "simple_map": {"abc": 1, "bcd": 7}, + "complex_map": {"key": {"a": "b", "c": "d"}}, + "union_string_null": {"string": "abc"}, + "union_int_long_null": {"int": 1}, + "union_float_double": {"float": 3.1415926535}, + "fixed3":"\u0002\u0003\u0004", + "fixed2":"\u0011\u0012", + "enum": "SPADES", + "record": {"value_field": "Two things are infinite: the universe and human stupidity; and I'm not sure about universe."}, + "array_of_boolean": [true, false, false], + "bytes": "\u0041\u0042\u0043" +} +{ + "string": "Terran is IMBA!", + "simple_map": {"mmm": 0, "qqq": 66}, + "complex_map": {"key": {"1": "2", "3": "4"}}, + "union_string_null": {"string": "123"}, + "union_int_long_null": {"long": 66}, + "union_float_double": {"double": 6.6666666666666}, + "fixed3":"\u0007\u0007\u0007", + "fixed2":"\u0001\u0002", + "enum": "CLUBS", + "record": {"value_field": "Life did not intend to make us perfect. Whoever is perfect belongs in a museum."}, + "array_of_boolean": [], + "bytes": "" +} +{ + "string": "The cake is a LIE!", + "simple_map": {}, + "complex_map": {"key": {}}, + "union_string_null": {"null": null}, + "union_int_long_null": {"null": null}, + "union_float_double": {"double": 0}, + "fixed3":"\u0011\u0022\u0009", + "fixed2":"\u0010\u0090", + "enum": "DIAMONDS", + "record": {"value_field": "TEST_STR123"}, + "array_of_boolean": [false], + "bytes": "\u0053" +} diff --git a/external/avro/src/test/scala/com/databricks/spark/avro/AvroFileGenerator.scala b/external/avro/src/test/scala/com/databricks/spark/avro/AvroFileGenerator.scala new file mode 100755 index 0000000000000..262bc7208a908 --- /dev/null +++ b/external/avro/src/test/scala/com/databricks/spark/avro/AvroFileGenerator.scala @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.databricks.spark.avro + +import java.io.File + +import scala.util.Random + +import org.apache.avro._ +import org.apache.avro.file.DataFileWriter +import org.apache.avro.generic._ +import org.apache.commons.io.FileUtils + +// scalastyle:off println + +/** + * This object allows you to generate large avro files that can be used for speed benchmarking. + * See README on how to use it. + */ +object AvroFileGenerator { + + val defaultNumberOfRecords = 1000000 + val defaultNumberOfFiles = 1 + val outputDir = "target/avroForBenchmark/" + val schemaPath = "src/test/resources/benchmarkSchema.avsc" + val objectSize = 100 // Maps, arrays and strings in our generated file have this size + + private[avro] def generateAvroFile(numberOfRecords: Int, fileIdx: Int) = { + val schema = new Schema.Parser().parse(new File(schemaPath)) + val outputFile = new File(outputDir + "part" + fileIdx + ".avro") + val datumWriter = new GenericDatumWriter[GenericRecord](schema) + val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) + dataFileWriter.create(schema, outputFile) + + // Create data that we will put into the avro file + val avroRec = new GenericData.Record(schema) + val innerRec = new GenericData.Record(schema.getField("inner_record").schema()) + innerRec.put("value_field", "Inner string") + val rand = new Random() + + for (idx <- 0 until numberOfRecords) { + avroRec.put("string", rand.nextString(objectSize)) + avroRec.put("simple_map", TestUtils.generateRandomMap(rand, objectSize)) + avroRec.put("union_int_long_null", rand.nextInt()) + avroRec.put("union_float_double", rand.nextDouble()) + avroRec.put("inner_record", innerRec) + avroRec.put("array_of_boolean", TestUtils.generateRandomArray(rand, objectSize)) + avroRec.put("bytes", TestUtils.generateRandomByteBuffer(rand, objectSize)) + + dataFileWriter.append(avroRec) + } + + dataFileWriter.close() + } + + def main(args: Array[String]) { + var numberOfRecords = defaultNumberOfRecords + var numberOfFiles = defaultNumberOfFiles + + if (args.size > 0) { + numberOfRecords = args(0).toInt + } + + if (args.size > 1) { + numberOfFiles = args(1).toInt + } + + println(s"Generating $numberOfFiles avro files with $numberOfRecords records each") + + FileUtils.deleteDirectory(new File(outputDir)) + new File(outputDir).mkdirs() // Create directory for output files + + for (fileIdx <- 0 until numberOfFiles) { + generateAvroFile(numberOfRecords, fileIdx) + } + + println("Generation finished") + } +} diff --git a/external/avro/src/test/scala/com/databricks/spark/avro/AvroReadBenchmark.scala b/external/avro/src/test/scala/com/databricks/spark/avro/AvroReadBenchmark.scala new file mode 100755 index 0000000000000..390098f523f58 --- /dev/null +++ b/external/avro/src/test/scala/com/databricks/spark/avro/AvroReadBenchmark.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.databricks.spark.avro + +import java.io.File +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.SparkSession + +// scalastyle:off println + +/** + * This object runs a simple benchmark test on the avro files in benchmarkFilesDir. It measures + * how long does it take to convert them into DataFrame and run count() method on them. See + * README on how to invoke it. + */ +object AvroReadBenchmark { + + def main(args: Array[String]) { + val benchmarkDirFiles = new File(AvroFileGenerator.outputDir).list + if (benchmarkDirFiles == null || benchmarkDirFiles.isEmpty) { + sys.error(s"The benchmark directory ($AvroFileGenerator.outputDir) does not exist or " + + "is empty. First you should generate some files to run a benchmark with (see README)") + } + + val spark = SparkSession.builder().master("local[2]").appName("AvroReadBenchmark") + .getOrCreate() + + spark.read.avro(AvroFileGenerator.outputDir).count() + + println("\n\n\nStaring benchmark test - creating DataFrame from benchmark avro files\n\n\n") + + val startTime = System.nanoTime + spark + .read + .avro(AvroFileGenerator.outputDir) + .select("string") + .count() + val endTime = System.nanoTime + val executionTime = TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + + println(s"\n\n\nFinished benchmark test - result was $executionTime seconds\n\n\n") + + spark.sparkContext.stop() // Otherwise scary exception message appears + } +} diff --git a/external/avro/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/external/avro/src/test/scala/com/databricks/spark/avro/AvroSuite.scala new file mode 100755 index 0000000000000..1fafa92d6eee7 --- /dev/null +++ b/external/avro/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -0,0 +1,675 @@ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.databricks.spark.avro + +import java.io._ +import java.nio.file.Files +import java.sql.Timestamp +import java.util.UUID + +// scalastyle:off +import scala.collection.JavaConversions._ + +// scalastyle:on +import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException +import org.apache.avro.Schema +import org.apache.avro.Schema.{Field, Type} +import org.apache.avro.file.DataFileWriter +import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord} +import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} +import org.apache.commons.io.FileUtils + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.types._ + +class AvroSuite extends SparkFunSuite { + val episodesFile = "src/test/resources/episodes.avro" + val testFile = "src/test/resources/test.avro" + + private var spark: SparkSession = _ + + override protected def beforeAll(): Unit = { + super.beforeAll() + spark = SparkSession.builder() + .master("local[2]") + .appName("AvroSuite") + .config("spark.sql.files.maxPartitionBytes", 1024) + .getOrCreate() + } + + override protected def afterAll(): Unit = { + try { + spark.sparkContext.stop() + } finally { + super.afterAll() + } + } + + test("reading and writing partitioned data") { + val df = spark.read.avro(episodesFile) + val fields = List("title", "air_date", "doctor") + for (field <- fields) { + TestUtils.withTempDir { dir => + val outputDir = s"$dir/${UUID.randomUUID}" + df.write.partitionBy(field).avro(outputDir) + val input = spark.read.avro(outputDir) + // makes sure that no fields got dropped. + // We convert Rows to Seqs in order to work around SPARK-10325 + assert(input.select(field).collect().map(_.toSeq).toSet === + df.select(field).collect().map(_.toSeq).toSet) + } + } + } + + test("request no fields") { + val df = spark.read.avro(episodesFile) + df.registerTempTable("avro_table") + assert(spark.sql("select count(*) from avro_table").collect().head === Row(8)) + } + + test("convert formats") { + TestUtils.withTempDir { dir => + val df = spark.read.avro(episodesFile) + df.write.parquet(dir.getCanonicalPath) + assert(spark.read.parquet(dir.getCanonicalPath).count() === df.count) + } + } + + test("rearrange internal schema") { + TestUtils.withTempDir { dir => + val df = spark.read.avro(episodesFile) + df.select("doctor", "title").write.avro(dir.getCanonicalPath) + } + } + + test("test NULL avro type") { + TestUtils.withTempDir { dir => + val fields = Seq(new Field("null", Schema.create(Type.NULL), "doc", null)) + val schema = Schema.createRecord("name", "docs", "namespace", false) + schema.setFields(fields) + val datumWriter = new GenericDatumWriter[GenericRecord](schema) + val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) + dataFileWriter.create(schema, new File(s"$dir.avro")) + val avroRec = new GenericData.Record(schema) + avroRec.put("null", null) + dataFileWriter.append(avroRec) + dataFileWriter.flush() + dataFileWriter.close() + + intercept[IncompatibleSchemaException] { + spark.read.avro(s"$dir.avro") + } + } + } + + test("union(int, long) is read as long") { + TestUtils.withTempDir { dir => + val avroSchema: Schema = { + val union = Schema.createUnion(List(Schema.create(Type.INT), Schema.create(Type.LONG))) + val fields = Seq(new Field("field1", union, "doc", null)) + val schema = Schema.createRecord("name", "docs", "namespace", false) + schema.setFields(fields) + schema + } + + val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema) + val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) + dataFileWriter.create(avroSchema, new File(s"$dir.avro")) + val rec1 = new GenericData.Record(avroSchema) + rec1.put("field1", 1.toLong) + dataFileWriter.append(rec1) + val rec2 = new GenericData.Record(avroSchema) + rec2.put("field1", 2) + dataFileWriter.append(rec2) + dataFileWriter.flush() + dataFileWriter.close() + val df = spark.read.avro(s"$dir.avro") + assert(df.schema.fields === Seq(StructField("field1", LongType, nullable = true))) + assert(df.collect().toSet == Set(Row(1L), Row(2L))) + } + } + + test("union(float, double) is read as double") { + TestUtils.withTempDir { dir => + val avroSchema: Schema = { + val union = Schema.createUnion(List(Schema.create(Type.FLOAT), Schema.create(Type.DOUBLE))) + val fields = Seq(new Field("field1", union, "doc", null)) + val schema = Schema.createRecord("name", "docs", "namespace", false) + schema.setFields(fields) + schema + } + + val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema) + val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) + dataFileWriter.create(avroSchema, new File(s"$dir.avro")) + val rec1 = new GenericData.Record(avroSchema) + rec1.put("field1", 1.toFloat) + dataFileWriter.append(rec1) + val rec2 = new GenericData.Record(avroSchema) + rec2.put("field1", 2.toDouble) + dataFileWriter.append(rec2) + dataFileWriter.flush() + dataFileWriter.close() + val df = spark.read.avro(s"$dir.avro") + assert(df.schema.fields === Seq(StructField("field1", DoubleType, nullable = true))) + assert(df.collect().toSet == Set(Row(1.toDouble), Row(2.toDouble))) + } + } + + test("union(float, double, null) is read as nullable double") { + TestUtils.withTempDir { dir => + val avroSchema: Schema = { + val union = Schema.createUnion( + List(Schema.create(Type.FLOAT), Schema.create(Type.DOUBLE), Schema.create(Type.NULL))) + val fields = Seq(new Field("field1", union, "doc", null)) + val schema = Schema.createRecord("name", "docs", "namespace", false) + schema.setFields(fields) + schema + } + + val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema) + val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) + dataFileWriter.create(avroSchema, new File(s"$dir.avro")) + val rec1 = new GenericData.Record(avroSchema) + rec1.put("field1", 1.toFloat) + dataFileWriter.append(rec1) + val rec2 = new GenericData.Record(avroSchema) + rec2.put("field1", null) + dataFileWriter.append(rec2) + dataFileWriter.flush() + dataFileWriter.close() + val df = spark.read.avro(s"$dir.avro") + assert(df.schema.fields === Seq(StructField("field1", DoubleType, nullable = true))) + assert(df.collect().toSet == Set(Row(1.toDouble), Row(null))) + } + } + + test("Union of a single type") { + TestUtils.withTempDir { dir => + val UnionOfOne = Schema.createUnion(List(Schema.create(Type.INT))) + val fields = Seq(new Field("field1", UnionOfOne, "doc", null)) + val schema = Schema.createRecord("name", "docs", "namespace", false) + schema.setFields(fields) + + val datumWriter = new GenericDatumWriter[GenericRecord](schema) + val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) + dataFileWriter.create(schema, new File(s"$dir.avro")) + val avroRec = new GenericData.Record(schema) + + avroRec.put("field1", 8) + + dataFileWriter.append(avroRec) + dataFileWriter.flush() + dataFileWriter.close() + + val df = spark.read.avro(s"$dir.avro") + assert(df.first() == Row(8)) + } + } + + test("Complex Union Type") { + TestUtils.withTempDir { dir => + val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 4) + val enumSchema = Schema.createEnum("enum_name", "doc", "namespace", List("e1", "e2")) + val complexUnionType = Schema.createUnion( + List(Schema.create(Type.INT), Schema.create(Type.STRING), fixedSchema, enumSchema)) + val fields = Seq( + new Field("field1", complexUnionType, "doc", null), + new Field("field2", complexUnionType, "doc", null), + new Field("field3", complexUnionType, "doc", null), + new Field("field4", complexUnionType, "doc", null) + ) + val schema = Schema.createRecord("name", "docs", "namespace", false) + schema.setFields(fields) + val datumWriter = new GenericDatumWriter[GenericRecord](schema) + val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) + dataFileWriter.create(schema, new File(s"$dir.avro")) + val avroRec = new GenericData.Record(schema) + val field1 = 1234 + val field2 = "Hope that was not load bearing" + val field3 = Array[Byte](1, 2, 3, 4) + val field4 = "e2" + avroRec.put("field1", field1) + avroRec.put("field2", field2) + avroRec.put("field3", new Fixed(fixedSchema, field3)) + avroRec.put("field4", new EnumSymbol(enumSchema, field4)) + dataFileWriter.append(avroRec) + dataFileWriter.flush() + dataFileWriter.close() + + val df = spark.sqlContext.read.avro(s"$dir.avro") + assertResult(field1)(df.selectExpr("field1.member0").first().get(0)) + assertResult(field2)(df.selectExpr("field2.member1").first().get(0)) + assertResult(field3)(df.selectExpr("field3.member2").first().get(0)) + assertResult(field4)(df.selectExpr("field4.member3").first().get(0)) + } + } + + test("Lots of nulls") { + TestUtils.withTempDir { dir => + val schema = StructType(Seq( + StructField("binary", BinaryType, true), + StructField("timestamp", TimestampType, true), + StructField("array", ArrayType(ShortType), true), + StructField("map", MapType(StringType, StringType), true), + StructField("struct", StructType(Seq(StructField("int", IntegerType, true)))))) + val rdd = spark.sparkContext.parallelize(Seq[Row]( + Row(null, new Timestamp(1), Array[Short](1, 2, 3), null, null), + Row(null, null, null, null, null), + Row(null, null, null, null, null), + Row(null, null, null, null, null))) + val df = spark.createDataFrame(rdd, schema) + df.write.avro(dir.toString) + assert(spark.read.avro(dir.toString).count == rdd.count) + } + } + + test("Struct field type") { + TestUtils.withTempDir { dir => + val schema = StructType(Seq( + StructField("float", FloatType, true), + StructField("short", ShortType, true), + StructField("byte", ByteType, true), + StructField("boolean", BooleanType, true) + )) + val rdd = spark.sparkContext.parallelize(Seq( + Row(1f, 1.toShort, 1.toByte, true), + Row(2f, 2.toShort, 2.toByte, true), + Row(3f, 3.toShort, 3.toByte, true) + )) + val df = spark.createDataFrame(rdd, schema) + df.write.avro(dir.toString) + assert(spark.read.avro(dir.toString).count == rdd.count) + } + } + + test("Array data types") { + TestUtils.withTempDir { dir => + val testSchema = StructType(Seq( + StructField("byte_array", ArrayType(ByteType), true), + StructField("short_array", ArrayType(ShortType), true), + StructField("float_array", ArrayType(FloatType), true), + StructField("bool_array", ArrayType(BooleanType), true), + StructField("long_array", ArrayType(LongType), true), + StructField("double_array", ArrayType(DoubleType), true), + StructField("decimal_array", ArrayType(DecimalType(10, 0)), true), + StructField("bin_array", ArrayType(BinaryType), true), + StructField("timestamp_array", ArrayType(TimestampType), true), + StructField("array_array", ArrayType(ArrayType(StringType), true), true), + StructField("struct_array", ArrayType( + StructType(Seq(StructField("name", StringType, true))))))) + + val arrayOfByte = new Array[Byte](4) + for (i <- arrayOfByte.indices) { + arrayOfByte(i) = i.toByte + } + + val rdd = spark.sparkContext.parallelize(Seq( + Row(arrayOfByte, + Array[Short](1, 2, 3, 4), + Array[Float](1f, 2f, 3f, 4f), + Array[Boolean](true, false, true, false), + Array[Long](1L, 2L), Array[Double](1.0, 2.0), + Array[BigDecimal](BigDecimal.valueOf(3)), + Array[Array[Byte]](arrayOfByte, arrayOfByte), + Array[Timestamp](new Timestamp(0)), + Array[Array[String]](Array[String]("CSH, tearing down the walls that divide us", "-jd")), + Array[Row](Row("Bobby G. can't swim"))))) + val df = spark.createDataFrame(rdd, testSchema) + df.write.avro(dir.toString) + assert(spark.read.avro(dir.toString).count == rdd.count) + } + } + + test("write with compression") { + TestUtils.withTempDir { dir => + val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec" + val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level" + val uncompressDir = s"$dir/uncompress" + val deflateDir = s"$dir/deflate" + val snappyDir = s"$dir/snappy" + val fakeDir = s"$dir/fake" + + val df = spark.read.avro(testFile) + spark.conf.set(AVRO_COMPRESSION_CODEC, "uncompressed") + df.write.avro(uncompressDir) + spark.conf.set(AVRO_COMPRESSION_CODEC, "deflate") + spark.conf.set(AVRO_DEFLATE_LEVEL, "9") + df.write.avro(deflateDir) + spark.conf.set(AVRO_COMPRESSION_CODEC, "snappy") + df.write.avro(snappyDir) + + val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir)) + val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir)) + val snappySize = FileUtils.sizeOfDirectory(new File(snappyDir)) + + assert(uncompressSize > deflateSize) + assert(snappySize > deflateSize) + } + } + + test("dsl test") { + val results = spark.read.avro(episodesFile).select("title").collect() + assert(results.length === 8) + } + + test("support of various data types") { + // This test uses data from test.avro. You can see the data and the schema of this file in + // test.json and test.avsc + val all = spark.read.avro(testFile).collect() + assert(all.length == 3) + + val str = spark.read.avro(testFile).select("string").collect() + assert(str.map(_(0)).toSet.contains("Terran is IMBA!")) + + val simple_map = spark.read.avro(testFile).select("simple_map").collect() + assert(simple_map(0)(0).getClass.toString.contains("Map")) + assert(simple_map.map(_(0).asInstanceOf[Map[String, Some[Int]]].size).toSet == Set(2, 0)) + + val union0 = spark.read.avro(testFile).select("union_string_null").collect() + assert(union0.map(_(0)).toSet == Set("abc", "123", null)) + + val union1 = spark.read.avro(testFile).select("union_int_long_null").collect() + assert(union1.map(_(0)).toSet == Set(66, 1, null)) + + val union2 = spark.read.avro(testFile).select("union_float_double").collect() + assert( + union2 + .map(x => new java.lang.Double(x(0).toString)) + .exists(p => Math.abs(p - Math.PI) < 0.001)) + + val fixed = spark.read.avro(testFile).select("fixed3").collect() + assert(fixed.map(_(0).asInstanceOf[Array[Byte]]).exists(p => p(1) == 3)) + + val enum = spark.read.avro(testFile).select("enum").collect() + assert(enum.map(_(0)).toSet == Set("SPADES", "CLUBS", "DIAMONDS")) + + val record = spark.read.avro(testFile).select("record").collect() + assert(record(0)(0).getClass.toString.contains("Row")) + assert(record.map(_(0).asInstanceOf[Row](0)).contains("TEST_STR123")) + + val array_of_boolean = spark.read.avro(testFile).select("array_of_boolean").collect() + assert(array_of_boolean.map(_(0).asInstanceOf[Seq[Boolean]].size).toSet == Set(3, 1, 0)) + + val bytes = spark.read.avro(testFile).select("bytes").collect() + assert(bytes.map(_(0).asInstanceOf[Array[Byte]].length).toSet == Set(3, 1, 0)) + } + + test("sql test") { + spark.sql( + s""" + |CREATE TEMPORARY TABLE avroTable + |USING com.databricks.spark.avro + |OPTIONS (path "$episodesFile") + """.stripMargin.replaceAll("\n", " ")) + + assert(spark.sql("SELECT * FROM avroTable").collect().length === 8) + } + + test("conversion to avro and back") { + // Note that test.avro includes a variety of types, some of which are nullable. We expect to + // get the same values back. + TestUtils.withTempDir { dir => + val avroDir = s"$dir/avro" + spark.read.avro(testFile).write.avro(avroDir) + TestUtils.checkReloadMatchesSaved(spark, testFile, avroDir) + } + } + + test("conversion to avro and back with namespace") { + // Note that test.avro includes a variety of types, some of which are nullable. We expect to + // get the same values back. + TestUtils.withTempDir { tempDir => + val name = "AvroTest" + val namespace = "com.databricks.spark.avro" + val parameters = Map("recordName" -> name, "recordNamespace" -> namespace) + + val avroDir = tempDir + "/namedAvro" + spark.read.avro(testFile).write.options(parameters).avro(avroDir) + TestUtils.checkReloadMatchesSaved(spark, testFile, avroDir) + + // Look at raw file and make sure has namespace info + val rawSaved = spark.sparkContext.textFile(avroDir) + val schema = rawSaved.collect().mkString("") + assert(schema.contains(name)) + assert(schema.contains(namespace)) + } + } + + test("converting some specific sparkSQL types to avro") { + TestUtils.withTempDir { tempDir => + val testSchema = StructType(Seq( + StructField("Name", StringType, false), + StructField("Length", IntegerType, true), + StructField("Time", TimestampType, false), + StructField("Decimal", DecimalType(10, 2), true), + StructField("Binary", BinaryType, false))) + + val arrayOfByte = new Array[Byte](4) + for (i <- arrayOfByte.indices) { + arrayOfByte(i) = i.toByte + } + val cityRDD = spark.sparkContext.parallelize(Seq( + Row("San Francisco", 12, new Timestamp(666), null, arrayOfByte), + Row("Palo Alto", null, new Timestamp(777), null, arrayOfByte), + Row("Munich", 8, new Timestamp(42), Decimal(3.14), arrayOfByte))) + val cityDataFrame = spark.createDataFrame(cityRDD, testSchema) + + val avroDir = tempDir + "/avro" + cityDataFrame.write.avro(avroDir) + assert(spark.read.avro(avroDir).collect().length == 3) + + // TimesStamps are converted to longs + val times = spark.read.avro(avroDir).select("Time").collect() + assert(times.map(_(0)).toSet == Set(666, 777, 42)) + + // DecimalType should be converted to string + val decimals = spark.read.avro(avroDir).select("Decimal").collect() + assert(decimals.map(_(0)).contains("3.14")) + + // There should be a null entry + val length = spark.read.avro(avroDir).select("Length").collect() + assert(length.map(_(0)).contains(null)) + + val binary = spark.read.avro(avroDir).select("Binary").collect() + for (i <- arrayOfByte.indices) { + assert(binary(1)(0).asInstanceOf[Array[Byte]](i) == arrayOfByte(i)) + } + } + } + + test("support of globbed paths") { + val e1 = spark.read.avro("*/test/resources/episodes.avro").collect() + assert(e1.length == 8) + + val e2 = spark.read.avro("src/*/*/episodes.avro").collect() + assert(e2.length == 8) + } + + test("support user provided avro schema") { + val avroSchema = + """ + |{ + | "type" : "record", + | "name" : "test_schema", + | "fields" : [{ + | "name" : "string", + | "type" : "string", + | "doc" : "Meaningless string of characters" + | }] + |} + """.stripMargin + val result = spark.read.option(DefaultSource.AvroSchema, avroSchema).avro(testFile).collect() + val expected = spark.read.avro(testFile).select("string").collect() + assert(result.sameElements(expected)) + } + + test("support user provided avro schema with defaults for missing fields") { + val avroSchema = + """ + |{ + | "type" : "record", + | "name" : "test_schema", + | "fields" : [{ + | "name" : "missingField", + | "type" : "string", + | "default" : "foo" + | }] + |} + """.stripMargin + val result = spark.read.option(DefaultSource.AvroSchema, avroSchema) + .avro(testFile).select("missingField").first + assert(result === Row("foo")) + } + + test("reading from invalid path throws exception") { + + // Directory given has no avro files + intercept[AnalysisException] { + TestUtils.withTempDir(dir => spark.read.avro(dir.getCanonicalPath)) + } + + intercept[AnalysisException] { + spark.read.avro("very/invalid/path/123.avro") + } + + // In case of globbed path that can't be matched to anything, another exception is thrown (and + // exception message is helpful) + intercept[AnalysisException] { + spark.read.avro("*/*/*/*/*/*/*/something.avro") + } + + intercept[FileNotFoundException] { + TestUtils.withTempDir { dir => + FileUtils.touch(new File(dir, "test")) + spark.read.avro(dir.toString) + } + } + + } + + test("SQL test insert overwrite") { + TestUtils.withTempDir { tempDir => + val tempEmptyDir = s"$tempDir/sqlOverwrite" + // Create a temp directory for table that will be overwritten + new File(tempEmptyDir).mkdirs() + spark.sql( + s""" + |CREATE TEMPORARY TABLE episodes + |USING com.databricks.spark.avro + |OPTIONS (path "$episodesFile") + """.stripMargin.replaceAll("\n", " ")) + spark.sql( + s""" + |CREATE TEMPORARY TABLE episodesEmpty + |(name string, air_date string, doctor int) + |USING com.databricks.spark.avro + |OPTIONS (path "$tempEmptyDir") + """.stripMargin.replaceAll("\n", " ")) + + assert(spark.sql("SELECT * FROM episodes").collect().length === 8) + assert(spark.sql("SELECT * FROM episodesEmpty").collect().isEmpty) + + spark.sql( + s""" + |INSERT OVERWRITE TABLE episodesEmpty + |SELECT * FROM episodes + """.stripMargin.replaceAll("\n", " ")) + assert(spark.sql("SELECT * FROM episodesEmpty").collect().length == 8) + } + } + + test("test save and load") { + // Test if load works as expected + TestUtils.withTempDir { tempDir => + val df = spark.read.avro(episodesFile) + assert(df.count == 8) + + val tempSaveDir = s"$tempDir/save/" + + df.write.avro(tempSaveDir) + val newDf = spark.read.avro(tempSaveDir) + assert(newDf.count == 8) + } + } + + test("test load with non-Avro file") { + // Test if load works as expected + TestUtils.withTempDir { tempDir => + val df = spark.read.avro(episodesFile) + assert(df.count == 8) + + val tempSaveDir = s"$tempDir/save/" + df.write.avro(tempSaveDir) + + Files.createFile(new File(tempSaveDir, "non-avro").toPath) + + val newDf = spark + .read + .option(DefaultSource.IgnoreFilesWithoutExtensionProperty, "true") + .avro(tempSaveDir) + + assert(newDf.count == 8) + } + } + + test("read avro with user defined schema: read partial columns") { + val partialColumns = StructType(Seq( + StructField("string", StringType, false), + StructField("simple_map", MapType(StringType, IntegerType), false), + StructField("complex_map", MapType(StringType, MapType(StringType, StringType)), false), + StructField("union_string_null", StringType, true), + StructField("union_int_long_null", LongType, true), + StructField("fixed3", BinaryType, true), + StructField("fixed2", BinaryType, true), + StructField("enum", StringType, false), + StructField("record", StructType(Seq(StructField("value_field", StringType, false))), false), + StructField("array_of_boolean", ArrayType(BooleanType), false), + StructField("bytes", BinaryType, true))) + val withSchema = spark.read.schema(partialColumns).avro(testFile).collect() + val withOutSchema = spark + .read + .avro(testFile) + .select("string", "simple_map", "complex_map", "union_string_null", "union_int_long_null", + "fixed3", "fixed2", "enum", "record", "array_of_boolean", "bytes") + .collect() + assert(withSchema.sameElements(withOutSchema)) + } + + test("read avro with user defined schema: read non-exist columns") { + val schema = + StructType( + Seq( + StructField("non_exist_string", StringType, true), + StructField( + "record", + StructType(Seq( + StructField("non_exist_field", StringType, false), + StructField("non_exist_field2", StringType, false))), + false))) + val withEmptyColumn = spark.read.schema(schema).avro(testFile).collect() + + assert(withEmptyColumn.forall(_ == Row(null: String, Row(null: String, null: String)))) + } + + test("read avro file partitioned") { + TestUtils.withTempDir { dir => + val sparkSession = spark + import sparkSession.implicits._ + val df = (0 to 1024 * 3).toDS.map(i => s"record${i}").toDF("records") + val outputDir = s"$dir/${UUID.randomUUID}" + df.write.avro(outputDir) + val input = spark.read.avro(outputDir) + assert(input.collect.toSet.size === 1024 * 3 + 1) + assert(input.rdd.partitions.size > 2) + } + } +} diff --git a/external/avro/src/test/scala/com/databricks/spark/avro/AvroWriteBenchmark.scala b/external/avro/src/test/scala/com/databricks/spark/avro/AvroWriteBenchmark.scala new file mode 100755 index 0000000000000..04a61f28369dd --- /dev/null +++ b/external/avro/src/test/scala/com/databricks/spark/avro/AvroWriteBenchmark.scala @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.databricks.spark.avro + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.util.Random + +import com.google.common.io.Files +import org.apache.commons.io.FileUtils + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.types._ + +// scalastyle:off println + +/** + * This object runs a simple benchmark test to find out how long does it take to write a large + * DataFrame to an avro file. It reads one argument, which specifies how many rows does the + * DataFrame that we're writing contain. + */ +object AvroWriteBenchmark { + + val defaultNumberOfRows = 1000000 + val defaultSize = 100 // Size used for items in generated RDD like strings, arrays and maps + + val testSchema = StructType(Seq( + StructField("StringField", StringType, false), + StructField("IntField", IntegerType, true), + StructField("DoubleField", DoubleType, false), + StructField("DecimalField", DecimalType(10, 10), true), + StructField("ArrayField", ArrayType(BooleanType), false), + StructField("MapField", MapType(StringType, IntegerType), true), + StructField("StructField", StructType(Seq(StructField("id", IntegerType, true))), false))) + + private def generateRandomRow(): Row = { + val rand = new Random() + Row(rand.nextString(defaultSize), rand.nextInt(), rand.nextDouble(), rand.nextDouble(), + TestUtils.generateRandomArray(rand, defaultSize).asScala, + TestUtils.generateRandomMap(rand, defaultSize).asScala.toMap, Row(rand.nextInt())) + } + + def main(args: Array[String]) { + var numberOfRows = defaultNumberOfRows + if (args.size > 0) { + numberOfRows = args(0).toInt + } + + println(s"\n\n\nPreparing for a benchmark test - creating a RDD with $numberOfRows rows\n\n\n") + + val spark = SparkSession.builder().master("local[2]").appName("AvroReadBenchmark") + .getOrCreate() + + val tempDir = Files.createTempDir() + val avroDir = tempDir + "/avro" + val testDataFrame = spark.createDataFrame( + spark.sparkContext.parallelize(0 until numberOfRows).map(_ => generateRandomRow()), + testSchema) + + println("\n\n\nStaring benchmark test - writing a DataFrame as avro file\n\n\n") + + val startTime = System.nanoTime + + testDataFrame.write.avro(avroDir) + + val endTime = System.nanoTime + val executionTime = TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + + println(s"\n\n\nFinished benchmark test - result was $executionTime seconds\n\n\n") + + FileUtils.deleteDirectory(tempDir) + spark.sparkContext.stop() // Otherwise scary exception message appears + } +} diff --git a/external/avro/src/test/scala/com/databricks/spark/avro/SerializableConfigurationSuite.scala b/external/avro/src/test/scala/com/databricks/spark/avro/SerializableConfigurationSuite.scala new file mode 100755 index 0000000000000..0c2fcad2025e9 --- /dev/null +++ b/external/avro/src/test/scala/com/databricks/spark/avro/SerializableConfigurationSuite.scala @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.databricks.spark.avro + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerInstance} + +class SerializableConfigurationSuite extends SparkFunSuite { + + private def testSerialization(serializer: SerializerInstance): Unit = { + import DefaultSource.SerializableConfiguration + val conf = new SerializableConfiguration(new Configuration()) + + val serialized = serializer.serialize(conf) + + serializer.deserialize[Any](serialized) match { + case c: SerializableConfiguration => + assert(c.log != null, "log was null") + assert(c.value != null, "value was null") + case other => fail( + s"Expecting ${classOf[SerializableConfiguration]}, but got ${other.getClass}.") + } + } + + test("serialization with JavaSerializer") { + testSerialization(new JavaSerializer(new SparkConf()).newInstance()) + } + + test("serialization with KryoSerializer") { + testSerialization(new KryoSerializer(new SparkConf()).newInstance()) + } + +} diff --git a/external/avro/src/test/scala/com/databricks/spark/avro/TestUtils.scala b/external/avro/src/test/scala/com/databricks/spark/avro/TestUtils.scala new file mode 100755 index 0000000000000..fdc43849695fd --- /dev/null +++ b/external/avro/src/test/scala/com/databricks/spark/avro/TestUtils.scala @@ -0,0 +1,148 @@ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.databricks.spark.avro + +import java.io.{File, IOException} +import java.nio.ByteBuffer +import java.util + +import scala.collection.immutable.HashSet +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import com.google.common.io.Files + +import org.apache.spark.sql.SparkSession + +private[avro] object TestUtils { + + /** + * This function checks that all records in a file match the original + * record. + */ + def checkReloadMatchesSaved(spark: SparkSession, testFile: String, avroDir: String): Unit = { + + def convertToString(elem: Any): String = { + elem match { + case null => "NULL" // HashSets can't have null in them, so we use a string instead + case arrayBuf: ArrayBuffer[_] => + arrayBuf.asInstanceOf[ArrayBuffer[Any]].toArray.deep.mkString(" ") + case arrayByte: Array[Byte] => arrayByte.deep.mkString(" ") + case other => other.toString + } + } + + val originalEntries = spark.read.avro(testFile).collect() + val newEntries = spark.read.avro(avroDir).collect() + + assert(originalEntries.length == newEntries.length) + + val origEntrySet = Array.fill(originalEntries(0).size)(new HashSet[Any]()) + for (origEntry <- originalEntries) { + var idx = 0 + for (origElement <- origEntry.toSeq) { + origEntrySet(idx) += convertToString(origElement) + idx += 1 + } + } + + for (newEntry <- newEntries) { + var idx = 0 + for (newElement <- newEntry.toSeq) { + assert(origEntrySet(idx).contains(convertToString(newElement))) + idx += 1 + } + } + } + + def withTempDir(f: File => Unit): Unit = { + val dir = Files.createTempDir() + dir.delete() + try f(dir) finally deleteRecursively(dir) + } + + /** + * This function deletes a file or a directory with everything that's in it. This function is + * copied from Spark with minor modifications made to it. See original source at: + * github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala + */ + + def deleteRecursively(file: File) { + def listFilesSafely(file: File): Seq[File] = { + if (file.exists()) { + val files = file.listFiles() + if (files == null) { + throw new IOException("Failed to list files for dir: " + file) + } + files + } else { + List() + } + } + + if (file != null) { + try { + if (file.isDirectory) { + var savedIOException: IOException = null + for (child <- listFilesSafely(file)) { + try { + deleteRecursively(child) + } catch { + // In case of multiple exceptions, only last one will be thrown + case ioe: IOException => savedIOException = ioe + } + } + if (savedIOException != null) { + throw savedIOException + } + } + } finally { + if (!file.delete()) { + // Delete can also fail if the file simply did not exist + if (file.exists()) { + throw new IOException("Failed to delete: " + file.getAbsolutePath) + } + } + } + } + } + + /** + * This function generates a random map(string, int) of a given size. + */ + private[avro] def generateRandomMap(rand: Random, size: Int): java.util.Map[String, Int] = { + val jMap = new util.HashMap[String, Int]() + for (i <- 0 until size) { + jMap.put(rand.nextString(5), i) + } + jMap + } + + /** + * This function generates a random array of booleans of a given size. + */ + private[avro] def generateRandomArray(rand: Random, size: Int): util.ArrayList[Boolean] = { + val vec = new util.ArrayList[Boolean]() + for (i <- 0 until size) { + vec.add(rand.nextBoolean()) + } + vec + } + + /** + * This function generates a random ByteBuffer of a given size. + */ + private[avro] def generateRandomByteBuffer(rand: Random, size: Int): ByteBuffer = { + val bb = ByteBuffer.allocate(size) + val arrayOfBytes = new Array[Byte](size) + rand.nextBytes(arrayOfBytes) + bb.put(arrayOfBytes) + } +} diff --git a/pom.xml b/pom.xml index c11bff8ddbc93..d269a4fcbeeb7 100644 --- a/pom.xml +++ b/pom.xml @@ -107,6 +107,7 @@ examples repl launcher + external/avro external/kafka-0-8 external/kafka-0-8-assembly external/kafka-0-10 diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 8ab59e3532d30..21e84021105a0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -39,8 +39,8 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile - val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, sqlKafka010, sqlKafka08) = Seq( - "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10", "sql-kafka-0-8" + val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, sqlKafka010, sqlKafka08, avro) = Seq( + "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10", "sql-kafka-0-8", "avro" ).map(ProjectRef(buildLocation, _)) val streamingProjects@Seq( @@ -353,7 +353,7 @@ object SparkBuild extends PomBuild { val mimaProjects = allProjects.filterNot { x => Seq( spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn, - unsafe, tags, sqlKafka010, sqlKafka08 + unsafe, tags, sqlKafka010, sqlKafka08, avro ).contains(x) } @@ -717,9 +717,9 @@ object Unidoc { publish := {}, unidocProjectFilter in(ScalaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010, sqlKafka08), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010, sqlKafka08, avro), unidocProjectFilter in(JavaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010, sqlKafka08), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010, sqlKafka08, avro), unidocAllClasspaths in (ScalaUnidoc, unidoc) := { ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)