Skip to content

Commit

Permalink
Merge pull request #226 from FundingCircle/wb-66-fix-avro-union-seria…
Browse files Browse the repository at this point in the history
…lisation

[WB-66] Fix avro union serialisation
  • Loading branch information
DaveWM authored Feb 6, 2020
2 parents 9bbf58b + 5c870ca commit 00a482d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

* Fixed bug in `map->ProducerRecord`
* Allow nullable `partition` and `timestamp` in `->ProducerRecord` (previously threw NPE)
* Fixed union type serialisation when members have similar fields

## [0.7.0] - [2019-12-19]

Expand Down
30 changes: 18 additions & 12 deletions src/jackdaw/serdes/avro.clj
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"
(:require [clojure.tools.logging :as log]
[clojure.core.cache :as cache]
[clojure.data]
[clojure.string :as str]
[jackdaw.serdes.avro.schema-registry :as registry]
[jackdaw.serdes.fn :as fn])
Expand Down Expand Up @@ -316,12 +317,15 @@
(defrecord EnumType [^Schema schema]
SchemaCoercion
(match-clj? [_ x]
(or
(string? x)
(keyword? x)))
(and (or
(string? x)
(keyword? x))
(contains? (set (.getEnumSymbols schema))
(mangle (name x)))))

(match-avro? [_ x]
(instance? GenericData$EnumSymbol x))
(and (instance? GenericData$EnumSymbol x)
(.hasEnumSymbol schema x)))

(avro->clj [_ avro-enum]
(-> (str avro-enum)
Expand Down Expand Up @@ -385,13 +389,15 @@
(defrecord RecordType [^Schema schema field->schema+coercion]
SchemaCoercion
(match-clj? [_ clj-map]
(let [fields (.getFields schema)]
(every? (fn [[field-key [^Schema$Field field field-coercion]]]
(let [field-value (get clj-map field-key ::missing)]
(if (= field-value ::missing)
(.defaultValue field)
(match-clj? field-coercion field-value))))
field->schema+coercion)))
(let [[_ unknown-fields _] (clojure.data/diff (set (keys field->schema+coercion))
(set (keys clj-map)))]
(and (every? (fn [[field-key [^Schema$Field field field-coercion]]]
(let [field-value (get clj-map field-key ::missing)]
(if (= field-value ::missing)
(.defaultValue field)
(match-clj? field-coercion field-value))))
field->schema+coercion)
(empty? unknown-fields))))

(match-avro? [_ avro-record]
(cond
Expand Down Expand Up @@ -472,7 +478,7 @@
(avro->clj schema-type avro-data)))

(clj->avro [_ clj-data path]
(if-let [schema-type (match-union-type coercion-types #(match-clj? % clj-data))]
(if-let [schema-type (match-union-type coercion-types #(match-clj? % clj-data))]
(clj->avro schema-type clj-data path)
(throw (ex-info (serialization-error-msg clj-data
(->> schemas
Expand Down
52 changes: 49 additions & 3 deletions test/jackdaw/serdes/avro_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,37 @@
(is (= clj-data (avro/avro->clj schema-type avro-data)))
(is (= avro-data-str-keys (avro/clj->avro schema-type clj-data [])))))
(testing "union"
(let [avro-schema (parse-schema ["long" "string"])
(let [record-1-schema {:name "recordOne"
:type "record"
:namespace "com.fundingcircle"
:fields [{:name "a"
:type {:name "enumOne"
:type "enum"
:symbols ["x"]}
:symbols ["x"]}]}
record-2-schema {:name "recordTwo"
:type "record"
:namespace "com.fundingcircle"
:fields [{:name "a"
:type {:name "enumTwo"
:type "enum"
:symbols ["y"]}}
{:name "b"
:type ["string" "null"]}]}
record-3-schema {:name "recordThree"
:type "record"
:namespace "com.fundingcircle"
:fields [{:name "a"
:type {:name "enumThree"
:type "enum"
:symbols ["y"]}}
{:name "c"
:type ["string" "null"]}]}
avro-schema (parse-schema ["long"
"string"
record-1-schema
record-2-schema
record-3-schema])
schema-type (schema-type avro-schema)
clj-data-long 123
avro-data-long 123
Expand All @@ -273,7 +303,22 @@
(is (= clj-data-string (avro/avro->clj schema-type avro-data-string)))
(is (= (str avro-data-string) (avro/clj->avro schema-type clj-data-string [])))
(is (= clj-data-num-as-string (avro/avro->clj schema-type avro-data-num-as-string)))
(is (= (str avro-data-num-as-string) (avro/clj->avro schema-type clj-data-num-as-string [])))))
(is (= (str avro-data-num-as-string) (avro/clj->avro schema-type clj-data-num-as-string [])))
(is (= (->generic-record (parse-schema record-1-schema) {"a" "x"})
(avro/clj->avro schema-type {:a :x} [])))
(is (= {:a :x}
(avro/avro->clj schema-type (->generic-record (parse-schema record-1-schema) {"a" "x"}))))
(is (= (->generic-record (parse-schema record-2-schema) {"a" "y" "b" "test"})
(avro/clj->avro schema-type {:a :y :b "test"} [])))
(is (= {:a :y :b "test"}
(avro/avro->clj schema-type (->generic-record (parse-schema record-2-schema) {"a" "y" "b" "test"}))))
(is (= (->generic-record (parse-schema record-3-schema) {"a" "y" "c" "test"})
(avro/clj->avro schema-type {:a :y :c "test"} [])))
(is (= {:a :y :c "test"}
(avro/avro->clj schema-type (->generic-record (parse-schema record-3-schema) {"a" "y" "c" "test"}))))
(is (thrown? Exception (avro/clj->avro schema-type {:a :y} [])))
(is (thrown? Exception (avro/clj->avro schema-type {:a :x :d "test"} [])))
(is (thrown? Exception (avro/clj->avro schema-type {:a :x :b "test"} [])))))
(testing "marshalling unrecognized union type throws exception"
(let [avro-schema (parse-schema ["null" "long"])
schema-type (schema-type avro-schema)]
Expand All @@ -295,7 +340,8 @@
avro-data (->generic-record avro-schema {"industry_code_version" avro-enum})]
(is (= clj-data (avro/avro->clj schema-type avro-data)))
(is (= avro-data (avro/clj->avro schema-type clj-data [])))
(is (= avro-data (avro/clj->avro schema-type {:industry-code-version "SIC-2003"} [])))))
(is (= avro-data (avro/clj->avro schema-type {:industry-code-version "SIC-2003"} [])))
(is (thrown? Exception (avro/clj->avro schema-type {:industry-code-version "invalid"} [])))))
(testing "record"
(let [nested-schema-json {:name "nestedRecord"
:type "record"
Expand Down

0 comments on commit 00a482d

Please sign in to comment.