Skip to content

Commit

Permalink
Allow structs and list of structs in g/->dataset (#335) (#336)
Browse files Browse the repository at this point in the history
* Allow structs and list of structs in g/->dataset (#335)

* Allow missing nested keys in g/->dataset (#335)
  • Loading branch information
WaqasAliAbbasi authored Jan 10, 2022
1 parent e3c88ad commit dba9080
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ pom.xml.asc
/lib/
/classes/
/target/
/docker/target/
/checkouts/
.lein-deps-sum
.lein-repl-history
.lein-plugins/
.lein-failures
.nrepl-port
.cpcache/
.calva/
.lsp/
82 changes: 76 additions & 6 deletions src/clojure/zero_one/geni/core/dataset_creation.clj
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,11 @@
SparseVector (VectorUDT.)
nil DataTypes/NullType})

(declare infer-schema infer-spark-type)

(defn- infer-spark-type [value]
(cond
(map? value) (infer-schema (map name (keys value)) (vals value))
(coll? value) (ArrayType. (infer-spark-type (first value)) true)
:else (get java-type->spark-type (type value) DataTypes/BinaryType)))

Expand All @@ -142,12 +145,77 @@
(DataTypes/createStructType
(mapv infer-struct-field col-names values)))

(defn- first-non-nil [values]
(first (filter (complement nil?) values)))
(defn- update-val-in
"Works similar to update-in but accepts value instead of function.
If old and new values are collections, merges/concatenates them.
If the associative structure is nil, initialises it to provided value."
[m path val]
(if-not m val
(update-in m path (fn [old new]
(cond
(nil? old) new
(and (map? old) (map? new)) (merge old new)
(and (coll? new) (= (type old) (type new))) (into old new)
:else new)) val)))

(defn- first-non-nil
"Looks through values and recursively finds the first non-nil value.
For maps, it returns a first non-nil value for each nested key.
For list of maps, it returns a list of one map with first non-nil value for each nested key.
Examples:
[1 2 3] => [1]
[nil [1 2]] => [[1]]
[{:a 1} {:a 3 :b true}] => [{:a 1 :b true}]
[{:a 1} {:b [{:a 4} {:c 3}]}] => [{:a 1 :b [{:a 4 :c 3}]}]
[{:a 1} {:b [[{:a 4} {:c 3}] [{:h true}]]}] => [{:a 1 :b [[{:a 4 :c 3 :h true}]]}]"
([v]
(first-non-nil v nil []))
([v non-nil path]
(cond (map? v) (reduce #(first-non-nil (get v %2) %1 (conj path %2)) (update-val-in non-nil path {}) (keys v))
(coll? v) (reduce (fn [non-nil v]
(let [path (conj path 0)
non-nil (first-non-nil v non-nil path)]
(if (coll? (get-in non-nil path)) non-nil (reduced non-nil))))
(update-val-in non-nil path []) (filter (complement nil?) v))
(or (nil? v) (some? (get-in non-nil path))) non-nil
:else (update-val-in non-nil path v))))

(defn- fill-missing-nested-keys
"Recursively fills in any missing keys. Takes as input the records and a sample non-nil value.
The sample non-nil value can be generated using first-non-nil function above.
Examples:
[1 2 3] | [1]
=> [1 2 3]
[nil [1 2]] | [[1]]
=> [nil [1 2]]
[{:a 1} {:a 3 :b true}] | [{:a 1 :b true}]
=> [{:a 1 :b nil} {:a 3 :b true}]
[{:a 1} {:b [{:a 4} {:c 3}]}] | [{:a 1 :b [{:a 4 :c 3}]}])
=> [{:a 1 :b nil} {:a nil :b [{:a 4 :c nil} {:a nil :c 3}]}]
[{:a 1} {:b [[{:a 4} {:c 3}] [{:h true}]]}] | [{:a 1 :b [[{:a 4 :c 3 :h true}]]}]
=> [{:a 1 :b nil} {:a nil :b [[{:a 4 :c nil :h nil} {:a nil :c 3 :h nil}] [{:a nil :c nil :h true}]]}]"
([v non-nil]
(fill-missing-nested-keys v non-nil []))
([v non-nil path]
(cond
(map? v) (reduce #(assoc %1 %2 (fill-missing-nested-keys (get v %2) non-nil (conj path %2)))
{} (keys (get-in non-nil path)))
(and (coll? v)
(coll? (get-in non-nil (conj path 0)))) (map #(fill-missing-nested-keys % non-nil (conj path 0)) v)
:else v)))

(defn- transpose [xs]
(apply map list xs))

(defn- transform-maps
[value]
(cond
(map? value) (interop/->spark-row (transform-maps (vals value)))
(coll? value) (map transform-maps value)
:else value))

(defn table->dataset
"Construct a Dataset from a collection of collections.
Expand All @@ -164,10 +232,12 @@
([spark table col-names]
(if (empty? table)
(.emptyDataFrame spark)
(let [col-names (map name col-names)
values (map first-non-nil (transpose table))
rows (interop/->java-list (map interop/->spark-row table))
schema (infer-schema col-names values)]
(let [col-names (map name col-names)
transposed (transpose table)
values (map first-non-nil transposed)
table (transpose (map (partial apply fill-missing-nested-keys) (zipmap transposed values)))
rows (interop/->java-list (map interop/->spark-row (transform-maps table)))
schema (infer-schema col-names (map first values))]
(.createDataFrame spark rows schema)))))

(defn map->dataset
Expand Down
91 changes: 89 additions & 2 deletions test/zero_one/geni/dataset_creation_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,64 @@
(instance? Dataset dataset) => true
(g/collect-vals dataset) => [[0 "A" false]
[1 "B" false]
[2 "C" false]])))
[2 "C" false]]))
(fact "should work for map columns"
(let [dataset (g/records->dataset
@tr/spark
[{:i 0 :s "A" :b {:z ["a" "b"]}}
{:i 1 :s "B" :b {:z ["c" "d"]}}])]
(instance? Dataset dataset) => true
(g/collect-vals dataset) => [[0 "A" {:z ["a" "b"]}]
[1 "B" {:z ["c" "d"]}]]))
(fact "should work for map columns with missing keys"
(let [dataset (g/records->dataset
@tr/spark
[{:i 0 :s "A" :b {:z ["a" "b"]}}
{:i 1 :s "B" :b {:z ["c" "d"] :y true}}])]
(instance? Dataset dataset) => true
(g/collect-vals dataset) => [[0 "A" {:z ["a" "b"] :y nil}]
[1 "B" {:z ["c" "d"] :y true}]]))
(fact "should work for map columns with list of maps inside"
(let [dataset (g/records->dataset
@tr/spark
[{:i 0 :s "A" :b {:z [{:g 3}]}}
{:i 1 :s "B" :b {:z [{:g 5} {:h true}]}}])]
(instance? Dataset dataset) => true
(g/collect-vals dataset) => [[0 "A" {:z [{:g 3 :h nil}]}]
[1 "B" {:z [{:g 5 :h nil} {:g nil :h true}]}]]))
(fact "should work for list of map columns"
(let [dataset (g/records->dataset
@tr/spark
[{:i 0 :s "A" :b [{:z 1} {:z 2}]}
{:i 1 :s "B" :b [{:z 3}]}])]
(instance? Dataset dataset) => true
(g/collect-vals dataset) => [[0 "A" [{:z 1} {:z 2}]]
[1 "B" [{:z 3}]]]))
(fact "should work for list of map columns with missing keys in latter entries"
(let [dataset (g/records->dataset
@tr/spark
[{:i 0 :s "A" :b [{:z 1 :y true} {:z 2}]}
{:i 1 :s "B" :b [{:z 3}]}])]
(instance? Dataset dataset) => true
(g/collect-vals dataset) => [[0 "A" [{:z 1 :y true} {:z 2 :y nil}]]
[1 "B" [{:z 3 :y nil}]]]))
(fact "should work for list of map columns with missing keys in prior entries"
(let [dataset (g/records->dataset
@tr/spark
[{:i 0 :s "A" :b [{:z 1} {:z 2 :y true}]}
{:i 1 :s "B" :b [{:z 3}]}])]
(instance? Dataset dataset) => true
(g/collect-vals dataset) => [[0 "A" [{:z 1 :y nil} {:z 2 :y true}]]
[1 "B" [{:z 3 :y nil}]]]))
(fact "should work for list of list of maps with missing keys"
(let [dataset (g/records->dataset
@tr/spark
[{:i 0 :b [[{:z 1} {:z 2}] [{:h true}]]}
{:i 1 :b [[{:g 3.0}]]}])]
(instance? Dataset dataset) => true
(g/collect-vals dataset) => [[0 [[{:z 1 :h nil :g nil} {:z 2 :h nil :g nil}]
[{:z nil :h true :g nil}]]]
[1 [[{:z nil :h nil :g 3.0}]]]])))

(facts "On table->dataset"
(fact "should create the right dataset"
Expand All @@ -181,7 +238,37 @@
[:a :b :c])]
(instance? Dataset dataset) => true
(g/column-names dataset) => ["a" "b" "c"]
(g/collect-vals dataset) => [[1 2.0 "a"] [4 5.0 "b"]])))
(g/collect-vals dataset) => [[1 2.0 "a"] [4 5.0 "b"]]))
(fact "should create the right schema for maps"
(let [dataset (g/table->dataset
@tr/spark
[[1 {:z ["a"]}]
[4 {:z ["b" "c"] :y true}]]
[:a :b])]
(instance? Dataset dataset) => true
(g/column-names dataset) => ["a" "b"]
(g/dtypes dataset) => {:a "LongType"
:b "StructType(StructField(z,ArrayType(StringType,true),true), StructField(y,BooleanType,true))"}))
(fact "should create the right schema for list of maps"
(let [dataset (g/table->dataset
@tr/spark
[[1 [{:z 1}]]
[4 [{:z 3} {:y 3.0}]]]
[:a :b])]
(instance? Dataset dataset) => true
(g/column-names dataset) => ["a" "b"]
(g/dtypes dataset) => {:a "LongType"
:b "ArrayType(StructType(StructField(z,LongType,true), StructField(y,DoubleType,true)),true)"}))
(fact "should create the right schema for list of list of maps"
(let [dataset (g/table->dataset
@tr/spark
[[1 [[{:z 1}] [{:z 3}]]]
[4 [[{:y true}]]]]
[:a :b])]
(instance? Dataset dataset) => true
(g/column-names dataset) => ["a" "b"]
(g/dtypes dataset) => {:a "LongType"
:b "ArrayType(ArrayType(StructType(StructField(z,LongType,true), StructField(y,BooleanType,true)),true),true)"})))

(facts "On spark range"
(fact "should create simple datasets"
Expand Down

0 comments on commit dba9080

Please sign in to comment.