DPark is a Python clone of Spark, MapReduce(R) alike computing framework supporting iterative computation.
## Due to the use of C extensions, some libraries need to be installed first.
$ sudo apt-get install libtool pkg-config build-essential autoconf automake
$ sudo apt-get install python-dev
$ sudo apt-get install libzmq-dev
## Then just pip install dpark (``sudo`` maybe needed if you encounter permission problem).
$ pip install dpark
for word counting (wc.py
):
from dpark import DparkContext
ctx = DparkContext()
file = ctx.textFile("/tmp/words.txt")
words = file.flatMap(lambda x:x.split()).map(lambda x:(x,1))
wc = words.reduceByKey(lambda x,y:x+y).collectAsMap()
print wc
This script can run locally or on a Mesos cluster without any modification, just using different command-line arguments:
$ python wc.py
$ python wc.py -m process
$ python wc.py -m host[:port]
See examples/ for more use cases.
DPark can run with Mesos 0.9 or higher.
If a $MESOS_MASTER
environment variable is set, you can use a
shortcut and run DPark with Mesos just by typing
$ python wc.py -m mesos
$MESOS_MASTER
can be any scheme of Mesos master, such as
$ export MESOS_MASTER=zk://zk1:2181,zk2:2181,zk3:2181/mesos_master
In order to speed up shuffling, you should deploy Nginx at port 5055 for
accessing data in DPARK_WORK_DIR
(default is /tmp/dpark
), such
as:
server {
listen 5055;
server_name localhost;
root /tmp/dpark/;
}
2 DAGs:
- stage graph: stage is a running unit, contain a set of task, each run same ops for a split of rdd.
- use api callsite graph
Just open the url from log like start listening on Web UI http://server_01:40812
.
- before run, config LOGHUB & LOGHUB_PATH_FORMAT in dpark.conf, pre-create LOGHUB_DIR.
- get log hubdir from log like
logging/prof to LOGHUB_DIR/2018/09/27/16/b2e3349b-9858-4153-b491-80699c757485-8754
, which in clude mesos framework id. - run
dpark_web.py -p 9999 -l LOGHUB_DIR/2018/09/27/16/b2e3349b-9858-4153-b491-80699c757485-8728/
, dpark_web.py is in tools/
show sharing shuffle map output
rdd = DparkContext().makeRDD([(1,1)]).map(m).groupByKey()
rdd.map(m).collect()
rdd.map(m).collect()
combine nodes iff with same lineage, form a logic tree inside stage, then each node contain a PIPELINE of rdds.
rdd1 = get_rdd()
rdd2 = dc.union([get_rdd() for i in range(2)])
rdd3 = get_rdd().groupByKey()
dc.union([rdd1, rdd2, rdd3]).collect()
https://dpark.readthedocs.io/zh_CN/latest/
https://github.com/jackfengji/test_pro/wiki
Mailing list: dpark-users@googlegroups.com (http://groups.google.com/group/dpark-users)