Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added collect-to-arrow support #284

Merged
merged 28 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d9922b0
added collect tp arrow support
behrica Oct 29, 2020
ce6dbe7
added arrow dependencies
behrica Nov 2, 2020
1dfd983
fixed linting issues
behrica Nov 2, 2020
387344b
fixed test file names
behrica Nov 3, 2020
01a914b
made functions private
behrica Nov 3, 2020
1d246a8
refactored
behrica Nov 3, 2020
ce4e76f
made arrow deps "provided"
behrica Nov 3, 2020
33444c2
refactored type hints in typed-action
behrica Nov 3, 2020
f30426d
removed not needed binding
behrica Nov 3, 2020
8720803
fixed whitespace
behrica Nov 3, 2020
766929d
reformated with cljfmt
behrica Nov 3, 2020
7386312
Cosmetic changes
anthony-khong Nov 4, 2020
ed9c601
Another round of cosmetic changes
anthony-khong Nov 4, 2020
67b8c00
typed hinted setNull
behrica Nov 4, 2020
936084f
use when-not
behrica Nov 4, 2020
39af9cf
implement and test emty dataframe
behrica Nov 4, 2020
9220284
Merged from upstream + fixed formatting
anthony-khong Nov 4, 2020
fc9995c
Added Arrow dependencies to README + lein template
anthony-khong Nov 4, 2020
8c8dc7a
Made jsonista explicit dep
anthony-khong Nov 4, 2020
ae5352f
Added Arrow (+ other Spark deps) to deps.edn example
anthony-khong Nov 4, 2020
9db5605
Updated leftover deps.edn for docker caching
anthony-khong Nov 4, 2020
c70fcbe
integrated Anthonys cosmetic changes
behrica Nov 4, 2020
7467884
reformated
behrica Nov 4, 2020
26bcd71
Merge branch 'collect_arrow' of https://github.com/anthony-khong/geni…
behrica Nov 4, 2020
f5a5013
Merge branch 'anthony-khong-collect_arrow' into collect_arrow
behrica Nov 4, 2020
0402f59
reformated all
behrica Nov 5, 2020
d600f70
added docu on collection of data
behrica Nov 5, 2020
fe9e948
fixed docu
behrica Nov 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ VERSION=`cat resources/GENI_REPL_RELEASED_VERSION`

build:
cp project.clj docker/project.clj
cp examples/geni-clj-app/deps.edn docker/deps.edn
docker build -f docker/Dockerfile \
-t $(DOCKERNAME):latest \
docker
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Geni provides an idiomatic Spark interface for Clojure without the hassle of Jav
<li><a href="docs/spark_session.md">Where's The Spark Session</a></li>
<li><a href="docs/why.md">Why?</a></li>
<li><a href="docs/sql_maps.md">Working with SQL Maps</a></li>
<li><a href="docs/collect.md">Collect data into Clojure Repl</a></li>
</ul>
</td>
<td>
Expand Down Expand Up @@ -299,6 +300,11 @@ You would also need to add Spark as provided dependencies. For instance, have th
[org.apache.spark/spark-sql_2.12 "3.0.0"]
[org.apache.spark/spark-streaming_2.12 "3.0.0"]
[com.github.fommil.netlib/all "1.1.2" :extension "pom"]
; Arrow
[org.apache.arrow/arrow-memory-netty "2.0.0"]
[org.apache.arrow/arrow-memory-core "2.0.0"]
[org.apache.arrow/arrow-vector "2.0.0"
:exclusions [commons-codec com.fasterxml.jackson.core/jackson-databind]]
;; Databases
[mysql/mysql-connector-java "8.0.22"]
[org.postgresql/postgresql "42.2.18"]
Expand Down
8 changes: 8 additions & 0 deletions docker/deps.edn
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
{:deps
{io.netty/netty-all {:mvn/version "4.1.53.Final"}
;; Spark
org.apache.spark/spark-avro_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-core_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-hive_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-mllib_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-sql_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-streaming_2.12 {:mvn/version "3.0.1"}
;; Databases
mysql/mysql-connector-java {:mvn/version "8.0.21"}
org.postgresql/postgresql {:mvn/version "42.2.16"}
org.xerial/sqlite-jdbc {:mvn/version "3.32.3.2"}
;; Arrow
org.apache.arrow/arrow-memory-netty {:mvn/version "2.0.0"}
org.apache.arrow/arrow-memory-core {:mvn/version "2.0.0"}
org.apache.arrow/arrow-vector {:mvn/version "2.0.0"
:exclusions [commons-codec
com.fasterxml.jackson.core/jackson-databind]}
;; Optional: Spark XGBoost
ml.dmlc/xgboost4j-spark_2.12 {:mvn/version "1.0.0"}
ml.dmlc/xgboost4j_2.12 {:mvn/version "1.0.0"}
Expand Down
16 changes: 11 additions & 5 deletions docker/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
[org.apache.spark/spark-mllib_2.12 "3.0.1"]
[org.apache.spark/spark-sql_2.12 "3.0.1"]
[org.apache.spark/spark-streaming_2.12 "3.0.1"]
; Arrow
[org.apache.arrow/arrow-memory-netty "2.0.0"]
[org.apache.arrow/arrow-memory-core "2.0.0"]
[org.apache.arrow/arrow-vector "2.0.0"
:exclusions [commons-codec com.fasterxml.jackson.core/jackson-databind]]
; Databases
[mysql/mysql-connector-java "8.0.22"]
[org.postgresql/postgresql "42.2.18"]
Expand All @@ -34,20 +39,21 @@
[potemkin "0.4.5"]
[reply "0.4.4" :exclusions [javax.servlet/servlet-api]]
[zero.one/fxl "0.0.5"]]

:profiles
{:provided {:dependencies ~spark-deps}
:uberjar {:aot :all :dependencies ~spark-deps}
:dev {:dependencies [[enlive "1.1.6"]
[midje "1.9.9"]]
:dev {:dependencies [[criterium "0.4.6"]
[enlive "1.1.6"]
[midje "1.9.9"]
[techascent/tech.ml.dataset "5.00-alpha-22"]]
:plugins [[lein-ancient "0.6.15"]
[lein-cloverage "1.2.1"]
[lein-midje "3.2.2"]
[lein-cljfmt "0.7.0"]]
:resource-paths ["test/resources"] ; Needed to suppress TMD's debug log
:cljfmt {:split-keypairs-over-multiple-lines? false
:remove-multiple-non-indenting-spaces? false
;; Note: we add custom rules to handle code from midje test library
;; See https://github.com/weavejester/cljfmt/blob/master/cljfmt/resources/cljfmt/indents/clojure.clj
;; for more control
:indents {facts [[:inner 0] [:block 1]]
fact [[:inner 0] [:block 1]]}}
:aot [zero-one.geni.rdd.function
Expand Down
9 changes: 9 additions & 0 deletions docker/spark-project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
[reply "0.4.4" :exclusions [javax.servlet/servlet-api]]
[zero.one/fxl "0.0.5"]
;; Dev
[criterium "0.4.6"]
[enlive "1.1.6"]
[expound "0.8.6"]
[lein-ancient "0.6.15"]
[lein-cljfmt "0.7.0"]
[lein-cloverage "1.2.1"]
[lein-midje "3.2.1"]
[midje "1.9.9"]
[techascent/tech.ml.dataset "5.00-alpha-22"]
;; Spark
[org.apache.spark/spark-avro_2.12 "3.0.1"]
[org.apache.spark/spark-core_2.12 "3.0.1"]
Expand All @@ -21,6 +25,11 @@
[org.apache.spark/spark-sql_2.12 "3.0.1"]
[org.apache.spark/spark-streaming_2.12 "3.0.1"]
[com.github.fommil.netlib/all "1.1.2" :extension "pom"]
; Arrow
[org.apache.arrow/arrow-memory-netty "2.0.0"]
[org.apache.arrow/arrow-memory-core "2.0.0"]
[org.apache.arrow/arrow-vector "2.0.0"
:exclusions [commons-codec com.fasterxml.jackson.core/jackson-databind]]
; Databases
[mysql/mysql-connector-java "8.0.22"]
[org.postgresql/postgresql "42.2.18"]
Expand Down
105 changes: 105 additions & 0 deletions docs/collect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Collect data to the driver


So far we have used Spark functions which get executed on all nodes of the cluster by the Spark engine.
Spark has a lot of different options to operate on data, but sometimes we want to manipulate
the data differently in pure Clojure.

This requires to move the data from the Spark workers into the driver node, on which the Clojure Repl is running. This is only useful and possible, if the data is **small**, and fits on the node.

Geni offers several functions starting with `collect-` which transport the data to the driver and then into the Clojure Repl.



## Collect as Clojure data

A very common case is to access the data as Clojure maps with `collect`

```clojure

(-> fixed-df (g/limit 2) g/collect)
=> ({:du Parc "26",
:Rachel1 "16",
:Pierre-Dupuy "10",
:Berri 1 "35",
:Maisonneuve 1 "38",
:Brébeuf (données non disponibles) nil,
:Date "01/01/2012",
:Côte-Sainte-Catherine "0",
:St-Urbain (données non disponibles) nil,
:Maisonneuve 2 "51"}
{:du Parc "53",
:Rachel1 "43",
:Pierre-Dupuy "6",
:Berri 1 "83",
:Maisonneuve 1 "68",
:Brébeuf (données non disponibles) nil,
:Date "02/01/2012",
:Côte-Sainte-Catherine "1",
:St-Urbain (données non disponibles) nil,
:Maisonneuve 2 "153"}
)
```

Alternatively we can get the data as a sequence of vectors with `collect-vals`

```clojure

(-> fixed-df (g/limit 2) g/collect-vals)
=>
(["01/01/2012" 35 nil 0 38 51 26 10 16 nil]
["02/01/2012" 83 nil 1 68 153 53 6 43 nil])


```

To access a the values of a single column, we can use `collect-col`:

```clojure
(-> fixed-df (g/limit 2) (g/collect-col :Date))
=>
("01/01/2012" "02/01/2012")

```

## Collect as arrow files

We can get the data into the driver as arrow files as well, by using the function `collect-to-arrow`
This has the advantage, that it can work with data larger then the heap space of the driver.

The condition is, that the **largest partition** of the data fits into the driver, as the data gets transported by partition.
To make this sure, we can repartition the data before collecting it.

The `collect-to-arrow` function needs as well to know, how many rows each arrow file should get.
This should be set as well small enough, so that each arrow files fits in heap space.

The function will then create various arrow files, each having `chunk-size` rows. (except the last one, which is smaller)

We need to specify as well the target directory, where the files get written to.

The function returns a sequence of file names created.

```clojure
(-> fixed-df
(g/repartition 20) ;; split data in 20 partitions of equal size
(g/collect-to-arrow 100 "/tmp"))
=>
["/tmp/geni12331590604347994819.ipc" "/tmp/geni2107925719499812901.ipc" "/tmp/geni2239579196531625282.ipc" "/tmp/geni14530350610103010872.ipc"]


```

Setting the number of partitions and chunk size small enough, should allow the transfer of arbitrary large data to the driver. But it can obviously become slow, if data is big.

The files are written in the arrow-stream format, which can be processed by other software packages or with the Clojure "tech.ml.dataset" library.



## Integration with tech.ml.dataset

The very latest alpha version of tech.ml.dataset (tech.ml.dataset)[https://github.com/techascent/tech.ml.dataset]
offers a deeper integration with Geni, and allows to convert a Spark data-frame directly into a tech.ml.dataset. This happens on the driver, so the data need to fit in heap space.

See (here)[https://github.com/techascent/tech.ml.dataset/blob/43f411d224a50057ae7d8817d89eda3611b33115/src/tech/v3/libs/spark.clj#L191]

for details.
8 changes: 8 additions & 0 deletions examples/geni-clj-app/deps.edn
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
{:deps
{io.netty/netty-all {:mvn/version "4.1.53.Final"}
;; Spark
org.apache.spark/spark-avro_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-core_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-hive_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-mllib_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-sql_2.12 {:mvn/version "3.0.1"}
org.apache.spark/spark-streaming_2.12 {:mvn/version "3.0.1"}
;; Databases
mysql/mysql-connector-java {:mvn/version "8.0.21"}
org.postgresql/postgresql {:mvn/version "42.2.16"}
org.xerial/sqlite-jdbc {:mvn/version "3.32.3.2"}
;; Arrow
org.apache.arrow/arrow-memory-netty {:mvn/version "2.0.0"}
org.apache.arrow/arrow-memory-core {:mvn/version "2.0.0"}
org.apache.arrow/arrow-vector {:mvn/version "2.0.0"
:exclusions [commons-codec
com.fasterxml.jackson.core/jackson-databind]}
;; Optional: Spark XGBoost
ml.dmlc/xgboost4j-spark_2.12 {:mvn/version "1.0.0"}
ml.dmlc/xgboost4j_2.12 {:mvn/version "1.0.0"}
Expand Down
6 changes: 6 additions & 0 deletions lein-template/resources/leiningen/new/geni/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
:url "https://www.eclipse.org/legal/epl-2.0/"}
:dependencies [[org.clojure/clojure "1.10.1"]
[zero.one/geni "0.0.34"]
[metosin/jsonista "0.2.7"]
[expound "0.8.6"]
;; Spark
[org.apache.spark/spark-core_2.12 "3.0.1"]
Expand All @@ -14,6 +15,11 @@
[org.apache.spark/spark-streaming_2.12 "3.0.1"]
[org.apache.spark/spark-yarn_2.12 "3.0.1"]
[com.github.fommil.netlib/all "1.1.2" :extension "pom"]
; Arrow
[org.apache.arrow/arrow-memory-netty "2.0.0"]
[org.apache.arrow/arrow-memory-core "2.0.0"]
[org.apache.arrow/arrow-vector "2.0.0"
:exclusions [commons-codec com.fasterxml.jackson.core/jackson-databind]]
;; Databases
[mysql/mysql-connector-java "8.0.22"]
[org.postgresql/postgresql "42.2.18"]
Expand Down
16 changes: 11 additions & 5 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
[org.apache.spark/spark-mllib_2.12 "3.0.1"]
[org.apache.spark/spark-sql_2.12 "3.0.1"]
[org.apache.spark/spark-streaming_2.12 "3.0.1"]
; Arrow
[org.apache.arrow/arrow-memory-netty "2.0.0"]
[org.apache.arrow/arrow-memory-core "2.0.0"]
[org.apache.arrow/arrow-vector "2.0.0"
:exclusions [commons-codec com.fasterxml.jackson.core/jackson-databind]]
; Databases
[mysql/mysql-connector-java "8.0.22"]
[org.postgresql/postgresql "42.2.18"]
Expand All @@ -34,20 +39,21 @@
[potemkin "0.4.5"]
[reply "0.4.4" :exclusions [javax.servlet/servlet-api]]
[zero.one/fxl "0.0.5"]]

:profiles
{:provided {:dependencies ~spark-deps}
:uberjar {:aot :all :dependencies ~spark-deps}
:dev {:dependencies [[enlive "1.1.6"]
[midje "1.9.9"]]
:dev {:dependencies [[criterium "0.4.6"]
[enlive "1.1.6"]
[midje "1.9.9"]
[techascent/tech.ml.dataset "5.00-alpha-22"]]
:plugins [[lein-ancient "0.6.15"]
[lein-cloverage "1.2.1"]
[lein-midje "3.2.2"]
[lein-cljfmt "0.7.0"]]
:resource-paths ["test/resources"] ; Needed to suppress TMD's debug log
:cljfmt {:split-keypairs-over-multiple-lines? false
:remove-multiple-non-indenting-spaces? false
;; Note: we add custom rules to handle code from midje test library
;; See https://github.com/weavejester/cljfmt/blob/master/cljfmt/resources/cljfmt/indents/clojure.clj
;; for more control
:indents {facts [[:inner 0] [:block 1]]
fact [[:inner 0] [:block 1]]}}
:aot [zero-one.geni.rdd.function
Expand Down
Loading