Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Specifying a read schema with spark-avro #96

Closed
mwho opened this issue Oct 30, 2015 · 12 comments
Closed

Specifying a read schema with spark-avro #96

mwho opened this issue Oct 30, 2015 · 12 comments
Milestone

Comments

@mwho
Copy link

mwho commented Oct 30, 2015

It would be nice to have an option to supply a read schema (in lieu of the embedded schema) when reading avro files via spark-avro.

For example, the Python Avro API allows the following:
reader = DataFileReader(data, DatumReader(readers_schema=schema))

The scenario is this: I have many .avro files, possibly with different schemas (due to schema evolution), and I would like to use a single "master" schema to ingest all of those avro files into a single Spark Dataframe.

@theouteredge
Copy link

This will be a really good idea.
I've just been looking at Avro schema evolution and how I can manage this in spark

@jongyonkim
Copy link

I'm waiting for this feature as well. Otherwise there is no way of ingesting many avro files (each with their own schemas) with an up-to-date master schema.

@cleaton
Copy link

cleaton commented Nov 23, 2015

i'm also waiting for this feature as it seems spark is very slow in generating dataframe schema when a large number of sequence files are selected. It also seems to create thousands of broadcast variables and using a lot of memory on the driver node.

yanxiaole pushed a commit to yanxiaole/spark-avro that referenced this issue Dec 26, 2015
yanxiaole pushed a commit to yanxiaole/spark-avro that referenced this issue Jan 18, 2016
yanxiaole pushed a commit to yanxiaole/spark-avro that referenced this issue Feb 3, 2016
@tomseddon
Copy link

Hi, is there any progress on this feature? The only reason I'm not using spark-avro (using hadoopRDD instead) is because I need support for schemas that can evolve. I'm sometimes re-processing historical data which may not have some attributes which have been recently added, but the whole lot would be processed in one go ideally.

@clockfly
Copy link
Contributor

clockfly commented Jul 6, 2016

In Spark 2.0, you can specify a schema using spark.read.schema(user_defined_schema).format(...).

scala> val df = Seq((1,2,3)).toDF("a", "b", "c")
df: org.apache.spark.sql.DataFrame = [a: int, b: int ... 1 more field]

scala> df.write.format("com.databricks.spark.avro").save("/tmp/output")

scala> spark.read.format("com.databricks.spark.avro").load("/tmp/output").show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
+---+---+---+

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

// Prepare a customized schema
scala> val struct = StructType(StructField("a", IntegerType, true) :: Nil)
struct: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true))

scala> spark.read.format("com.databricks.spark.avro").schema(struct).load("/tmp/output").show()
+---+
|  a|
+---+
|  1|
+---+

// Save the schema to a Json string, you can later save this to a file.
scala> struct.json
res6: String = {"type":"struct","fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}}]}

// Load the Json string back
scala> spark.read.format("com.databricks.spark.avro").schema(DataType.fromJson(res6).asInstanceOf[StructType]).load("/tmp/output").show()
+---+
|  a|
+---+
|  1|
+---+

@tomseddon
Copy link

That's great, thanks for the reply.

@tomseddon
Copy link

Sorry, another question on this @clockfly. Shouldn't I be able to specify an avsc file? I tried this and I'm getting scala.MatchError.

I've already had to specify my input schema in case classes for conversion to dataset, and re-specifying the exact same thing in StructType seems unweildy. The most convenient scenario would be to just plug in my latest avsc generated from avro-tools.

@panda2727
Copy link

panda2727 commented Jul 18, 2016

Thanks @clockfly.
Do you guys have any trouble to load spark-avro 3.0.0-preview (for Spark 2.0)?

The parameter used in spark shell: --packages com.databricks:spark-avro_2.10:3.0.0-preview
(I also tried --packages com.databricks:spark-avro_2.10:2.0.1)

I get an error: java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.avro. Please use Spark package http://spark-packages.org/package/databricks/spark-avro

Thanks.

@tomseddon
Copy link

tomseddon commented Jul 27, 2016

I've been playing around with this.

Should it not be able to support missing fields if they are nullable? I can't really use this feature unless I can specify an all encompassing master schema which will work for all versions, including earlier files that have new fields missing.

In @clockfly's example above, I should be able to specify a schema like this

val struct = StructType(StructField("a", IntegerType, true) :: StructField("b", IntegerType, true) :: StructField("c", IntegerType, true) :: StructField("d", IntegerType, true) :: Nil)

and have the input file read as follows:

+---+---+---+----+
|  a|  b|  c|   d|
+---+---+---+----+
|  1|  2|  3|null|
+---+---+---+----+

I believe the problem is that the StructField nullable property isn't enough. It needs a default setting of null as well, which I can't see a way around :(

@clockfly
Copy link
Contributor

@tomseddon I have created a PR to solve this problem #155

@sambit19
Copy link

Wondering how to make this work in Spark Streaming?
spark.read.format("com.databricks.spark.avro").schema(DataType.fromJson(res6).asInstanceOf[StructType])**.load("/tmp/output").**show()

Is microbatching and writing it to the file system would be the way to go ahead

@JoshRosen JoshRosen modified the milestones: 3.0.1, 3.1.0 Sep 15, 2016
@Pradeepnitw
Copy link

Can you please let me know if the fix#96 is included in 3.0.1 build or scheduled for any future release. I tried the test using 3.0.1, but does not seem to be working.

bdrillard pushed a commit to bdrillard/spark-avro that referenced this issue Nov 29, 2016
With this PR, we can specify a user-provided custom schema when reading avro files. The custom schema can contain non-existing fields.

1. If the custom schema contains non-existing field and the field is nullable, then we will fill the value as null. The non-existing fields can exists as top level columns, or nested columns.
2. If the custom schema contains non-existing field and the field is NOT nullable, then we will throw an exception.
3. If the custom schema is a subset of the avro file schema, then we will only retrieve the fields defined in custom schema.

**Example:**
```
scala> val df = Seq((1,2,3)).toDF("a", "b", "c")
scala> df.write.format("com.databricks.spark.avro").save("/tmp/output")
scala> import org.apache.spark.sql.types._

// Prepare a customized schema
scala> val struct = StructType(StructField("a", IntegerType, true) :: StructField("non_exist", IntegerType, true) :: Nil)
scala> spark.read.format("com.databricks.spark.avro").schema(struct).load("/tmp/output").show()
+---+---------+
|  a|non_exist|
+---+---------+
|  1|     null|
+---+---------+

// Save the schema to a Json string, you can later save this to a file.
scala> val jsonSchema = struct.json
jsonSchema: String = {"type":"struct","fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}},{"name":"non_exist","type":"integer","nullable":true,"metadata":{}}]}

// Load the Json string back
scala> spark.read.format("com.databricks.spark.avro").schema(DataType.fromJson(jsonSchema).asInstanceOf[StructType]).load("/tmp/output").show()
+---+---------+
|  a|non_exist|
+---+---------+
|  1|     null|
+---+---------+
```

Fix databricks#96

Author: Sean Zhong <seanzhong@databricks.com>

Closes databricks#155 from clockfly/support_schema_evolution.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

10 participants