Skip to content

Commit

Permalink
added collect-to-arrow support (#284)
Browse files Browse the repository at this point in the history
* added collect tp arrow support

refactored

more data types and refactored

more data types and refactored

make use of type hints

added docstring

clean up

adapted to latest upstream

coomit auto-changing file

* added arrow dependencies

* fixed linting issues

* fixed test file names

* made functions private

* refactored

* made arrow deps "provided"

* refactored type hints in typed-action

* removed not needed binding

* fixed whitespace

* reformated with cljfmt

* Cosmetic changes

* Another round of cosmetic changes

* typed hinted setNull

* use when-not

* implement and test emty dataframe

* Added Arrow dependencies to README + lein template

* Made jsonista explicit dep

* Added Arrow (+ other Spark deps) to deps.edn example

* Updated leftover deps.edn for docker caching

* integrated Anthonys cosmetic changes

* reformated

* reformated all

* added docu on collection of data

* fixed docu

Co-authored-by: Anthony Khong <anthony.kusumo.khong@gmail.com>
  • Loading branch information
behrica and anthony-khong committed Nov 5, 2020
1 parent a36faa9 commit 83b24ea
Show file tree
Hide file tree
Showing 16 changed files with 498 additions and 11 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ VERSION=`cat resources/GENI_REPL_RELEASED_VERSION`

build:
cp project.clj docker/project.clj
cp examples/geni-clj-app/deps.edn docker/deps.edn
docker build -f docker/Dockerfile \
-t $(DOCKERNAME):latest \
docker
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Geni provides an idiomatic Spark interface for Clojure without the hassle of Jav
<li><a href="docs/spark_session.md">Where's The Spark Session</a></li>
<li><a href="docs/why.md">Why?</a></li>
<li><a href="docs/sql_maps.md">Working with SQL Maps</a></li>
<li><a href="docs/collect.md">Collect data into Clojure Repl</a></li>
</ul>
</td>
<td>
Expand Down Expand Up @@ -299,6 +300,11 @@ You would also need to add Spark as provided dependencies. For instance, have th
[org.apache.spark/spark-sql_2.12 "3.0.0"]
[org.apache.spark/spark-streaming_2.12 "3.0.0"]
[com.github.fommil.netlib/all "1.1.2" :extension "pom"]
; Arrow
[org.apache.arrow/arrow-memory-netty "2.0.0"]
[org.apache.arrow/arrow-memory-core "2.0.0"]
[org.apache.arrow/arrow-vector "2.0.0"
:exclusions [commons-codec com.fasterxml.jackson.core/jackson-databind]]
;; Databases
[mysql/mysql-connector-java "8.0.22"]
[org.postgresql/postgresql "42.2.18"]
Expand Down
8 changes: 8 additions & 0 deletions docker/deps.edn
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
{:deps
{io.netty/netty-all {:mvn/version "4.1.53.Final"}
;; Spark
org.apache.spark/spark-avro_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-core_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-hive_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-mllib_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-sql_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-streaming_2.12 {:mvn/version "3.0.1"}
;; Databases
mysql/mysql-connector-java {:mvn/version "8.0.21"}
org.postgresql/postgresql {:mvn/version "42.2.16"}
org.xerial/sqlite-jdbc {:mvn/version "3.32.3.2"}
;; Arrow
org.apache.arrow/arrow-memory-netty {:mvn/version "2.0.0"}
org.apache.arrow/arrow-memory-core {:mvn/version "2.0.0"}
org.apache.arrow/arrow-vector {:mvn/version "2.0.0"
:exclusions [commons-codec
com.fasterxml.jackson.core/jackson-databind]}
;; Optional: Spark XGBoost
ml.dmlc/xgboost4j-spark_2.12 {:mvn/version "1.0.0"}
ml.dmlc/xgboost4j_2.12 {:mvn/version "1.0.0"}
Expand Down
16 changes: 11 additions & 5 deletions docker/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
[org.apache.spark/spark-mllib_2.12 "3.0.1"]
[org.apache.spark/spark-sql_2.12 "3.0.1"]
[org.apache.spark/spark-streaming_2.12 "3.0.1"]
; Arrow
[org.apache.arrow/arrow-memory-netty "2.0.0"]
[org.apache.arrow/arrow-memory-core "2.0.0"]
[org.apache.arrow/arrow-vector "2.0.0"
:exclusions [commons-codec com.fasterxml.jackson.core/jackson-databind]]
; Databases
[mysql/mysql-connector-java "8.0.22"]
[org.postgresql/postgresql "42.2.18"]
Expand All @@ -34,20 +39,21 @@
[potemkin "0.4.5"]
[reply "0.4.4" :exclusions [javax.servlet/servlet-api]]
[zero.one/fxl "0.0.5"]]

:profiles
{:provided {:dependencies ~spark-deps}
:uberjar {:aot :all :dependencies ~spark-deps}
:dev {:dependencies [[enlive "1.1.6"]
[midje "1.9.9"]]
:dev {:dependencies [[criterium "0.4.6"]
[enlive "1.1.6"]
[midje "1.9.9"]
[techascent/tech.ml.dataset "5.00-alpha-22"]]
:plugins [[lein-ancient "0.6.15"]
[lein-cloverage "1.2.1"]
[lein-midje "3.2.2"]
[lein-cljfmt "0.7.0"]]
:resource-paths ["test/resources"] ; Needed to suppress TMD's debug log
:cljfmt {:split-keypairs-over-multiple-lines? false
:remove-multiple-non-indenting-spaces? false
;; Note: we add custom rules to handle code from midje test library
;; See https://github.com/weavejester/cljfmt/blob/master/cljfmt/resources/cljfmt/indents/clojure.clj
;; for more control
:indents {facts [[:inner 0] [:block 1]]
fact [[:inner 0] [:block 1]]}}
:aot [zero-one.geni.rdd.function
Expand Down
9 changes: 9 additions & 0 deletions docker/spark-project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
[reply "0.4.4" :exclusions [javax.servlet/servlet-api]]
[zero.one/fxl "0.0.5"]
;; Dev
[criterium "0.4.6"]
[enlive "1.1.6"]
[expound "0.8.6"]
[lein-ancient "0.6.15"]
[lein-cljfmt "0.7.0"]
[lein-cloverage "1.2.1"]
[lein-midje "3.2.1"]
[midje "1.9.9"]
[techascent/tech.ml.dataset "5.00-alpha-22"]
;; Spark
[org.apache.spark/spark-avro_2.12 "3.0.1"]
[org.apache.spark/spark-core_2.12 "3.0.1"]
Expand All @@ -21,6 +25,11 @@
[org.apache.spark/spark-sql_2.12 "3.0.1"]
[org.apache.spark/spark-streaming_2.12 "3.0.1"]
[com.github.fommil.netlib/all "1.1.2" :extension "pom"]
; Arrow
[org.apache.arrow/arrow-memory-netty "2.0.0"]
[org.apache.arrow/arrow-memory-core "2.0.0"]
[org.apache.arrow/arrow-vector "2.0.0"
:exclusions [commons-codec com.fasterxml.jackson.core/jackson-databind]]
; Databases
[mysql/mysql-connector-java "8.0.22"]
[org.postgresql/postgresql "42.2.18"]
Expand Down
105 changes: 105 additions & 0 deletions docs/collect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Collect data to the driver


So far we have used Spark functions which get executed on all nodes of the cluster by the Spark engine.
Spark has a lot of different options to operate on data, but sometimes we want to manipulate
the data differently in pure Clojure.

This requires to move the data from the Spark workers into the driver node, on which the Clojure Repl is running. This is only useful and possible, if the data is **small**, and fits on the node.

Geni offers several functions starting with `collect-` which transport the data to the driver and then into the Clojure Repl.



## Collect as Clojure data

A very common case is to access the data as Clojure maps with `collect`

```clojure

(-> fixed-df (g/limit 2) g/collect)
=> ({:du Parc "26",
:Rachel1 "16",
:Pierre-Dupuy "10",
:Berri 1 "35",
:Maisonneuve 1 "38",
:Brébeuf (données non disponibles) nil,
:Date "01/01/2012",
:Côte-Sainte-Catherine "0",
:St-Urbain (données non disponibles) nil,
:Maisonneuve 2 "51"}
{:du Parc "53",
:Rachel1 "43",
:Pierre-Dupuy "6",
:Berri 1 "83",
:Maisonneuve 1 "68",
:Brébeuf (données non disponibles) nil,
:Date "02/01/2012",
:Côte-Sainte-Catherine "1",
:St-Urbain (données non disponibles) nil,
:Maisonneuve 2 "153"}
)
```

Alternatively we can get the data as a sequence of vectors with `collect-vals`

```clojure

(-> fixed-df (g/limit 2) g/collect-vals)
=>
(["01/01/2012" 35 nil 0 38 51 26 10 16 nil]
["02/01/2012" 83 nil 1 68 153 53 6 43 nil])


```

To access a the values of a single column, we can use `collect-col`:

```clojure
(-> fixed-df (g/limit 2) (g/collect-col :Date))
=>
("01/01/2012" "02/01/2012")

```

## Collect as arrow files

We can get the data into the driver as arrow files as well, by using the function `collect-to-arrow`
This has the advantage, that it can work with data larger then the heap space of the driver.

The condition is, that the **largest partition** of the data fits into the driver, as the data gets transported by partition.
To make this sure, we can repartition the data before collecting it.

The `collect-to-arrow` function needs as well to know, how many rows each arrow file should get.
This should be set as well small enough, so that each arrow files fits in heap space.

The function will then create various arrow files, each having `chunk-size` rows. (except the last one, which is smaller)

We need to specify as well the target directory, where the files get written to.

The function returns a sequence of file names created.

```clojure
(-> fixed-df
(g/repartition 20) ;; split data in 20 partitions of equal size
(g/collect-to-arrow 100 "/tmp"))
=>
["/tmp/geni12331590604347994819.ipc" "/tmp/geni2107925719499812901.ipc" "/tmp/geni2239579196531625282.ipc" "/tmp/geni14530350610103010872.ipc"]


```

Setting the number of partitions and chunk size small enough, should allow the transfer of arbitrary large data to the driver. But it can obviously become slow, if data is big.

The files are written in the arrow-stream format, which can be processed by other software packages or with the Clojure "tech.ml.dataset" library.



## Integration with tech.ml.dataset

The very latest alpha version of tech.ml.dataset (tech.ml.dataset)[https://github.com/techascent/tech.ml.dataset]
offers a deeper integration with Geni, and allows to convert a Spark data-frame directly into a tech.ml.dataset. This happens on the driver, so the data need to fit in heap space.

See (here)[https://github.com/techascent/tech.ml.dataset/blob/43f411d224a50057ae7d8817d89eda3611b33115/src/tech/v3/libs/spark.clj#L191]

for details.
8 changes: 8 additions & 0 deletions examples/geni-clj-app/deps.edn
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
{:deps
{io.netty/netty-all {:mvn/version "4.1.53.Final"}
;; Spark
org.apache.spark/spark-avro_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-core_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-hive_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-mllib_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-sql_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-streaming_2.12 {:mvn/version "3.0.1"}
;; Databases
mysql/mysql-connector-java {:mvn/version "8.0.21"}
org.postgresql/postgresql {:mvn/version "42.2.16"}
org.xerial/sqlite-jdbc {:mvn/version "3.32.3.2"}
;; Arrow
org.apache.arrow/arrow-memory-netty {:mvn/version "2.0.0"}
org.apache.arrow/arrow-memory-core {:mvn/version "2.0.0"}
org.apache.arrow/arrow-vector {:mvn/version "2.0.0"
:exclusions [commons-codec
com.fasterxml.jackson.core/jackson-databind]}
;; Optional: Spark XGBoost
ml.dmlc/xgboost4j-spark_2.12 {:mvn/version "1.0.0"}
ml.dmlc/xgboost4j_2.12 {:mvn/version "1.0.0"}
Expand Down
6 changes: 6 additions & 0 deletions lein-template/resources/leiningen/new/geni/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
:url "https://www.eclipse.org/legal/epl-2.0/"}
:dependencies [[org.clojure/clojure "1.10.1"]
[zero.one/geni "0.0.34"]
[metosin/jsonista "0.2.7"]
[expound "0.8.6"]
;; Spark
[org.apache.spark/spark-core_2.12 "3.0.1"]
Expand All @@ -14,6 +15,11 @@
[org.apache.spark/spark-streaming_2.12 "3.0.1"]
[org.apache.spark/spark-yarn_2.12 "3.0.1"]
[com.github.fommil.netlib/all "1.1.2" :extension "pom"]
; Arrow
[org.apache.arrow/arrow-memory-netty "2.0.0"]
[org.apache.arrow/arrow-memory-core "2.0.0"]
[org.apache.arrow/arrow-vector "2.0.0"
:exclusions [commons-codec com.fasterxml.jackson.core/jackson-databind]]
;; Databases
[mysql/mysql-connector-java "8.0.22"]
[org.postgresql/postgresql "42.2.18"]
Expand Down
16 changes: 11 additions & 5 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
[org.apache.spark/spark-mllib_2.12 "3.0.1"]
[org.apache.spark/spark-sql_2.12 "3.0.1"]
[org.apache.spark/spark-streaming_2.12 "3.0.1"]
; Arrow
[org.apache.arrow/arrow-memory-netty "2.0.0"]
[org.apache.arrow/arrow-memory-core "2.0.0"]
[org.apache.arrow/arrow-vector "2.0.0"
:exclusions [commons-codec com.fasterxml.jackson.core/jackson-databind]]
; Databases
[mysql/mysql-connector-java "8.0.22"]
[org.postgresql/postgresql "42.2.18"]
Expand All @@ -34,20 +39,21 @@
[potemkin "0.4.5"]
[reply "0.4.4" :exclusions [javax.servlet/servlet-api]]
[zero.one/fxl "0.0.5"]]

:profiles
{:provided {:dependencies ~spark-deps}
:uberjar {:aot :all :dependencies ~spark-deps}
:dev {:dependencies [[enlive "1.1.6"]
[midje "1.9.9"]]
:dev {:dependencies [[criterium "0.4.6"]
[enlive "1.1.6"]
[midje "1.9.9"]
[techascent/tech.ml.dataset "5.00-alpha-22"]]
:plugins [[lein-ancient "0.6.15"]
[lein-cloverage "1.2.1"]
[lein-midje "3.2.2"]
[lein-cljfmt "0.7.0"]]
:resource-paths ["test/resources"] ; Needed to suppress TMD's debug log
:cljfmt {:split-keypairs-over-multiple-lines? false
:remove-multiple-non-indenting-spaces? false
;; Note: we add custom rules to handle code from midje test library
;; See https://github.com/weavejester/cljfmt/blob/master/cljfmt/resources/cljfmt/indents/clojure.clj
;; for more control
:indents {facts [[:inner 0] [:block 1]]
fact [[:inner 0] [:block 1]]}}
:aot [zero-one.geni.rdd.function
Expand Down
Loading

0 comments on commit 83b24ea

Please sign in to comment.