From a778d4b675ae765cb24553e9c1eb7f36f7709279 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 23 Jul 2014 15:43:11 -0700 Subject: [PATCH 1/3] Implemented DStream.foreachRDD in the Python API using Py4J callback server. --- .../python/streaming/network_wordcount.py | 4 +- python/lib/py4j-0.8.1-src.zip | Bin 37662 -> 37673 bytes python/pyspark/java_gateway.py | 2 +- python/pyspark/streaming/dstream.py | 44 ++++++++++--- python/pyspark/streaming/utils.py | 21 ++++++ .../streaming/api/java/JavaDStreamLike.scala | 8 --- .../streaming/api/python/PythonDStream.scala | 38 +++++++++++ .../spark/streaming/dstream/DStream.scala | 60 ------------------ 8 files changed, 95 insertions(+), 82 deletions(-) diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index c6ededc24db21..2bbb36a6b787e 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -17,9 +17,7 @@ fm_lines = lines.flatMap(lambda x: x.split(" ")) mapped_lines = fm_lines.map(lambda x: (x, 1)) reduced_lines = mapped_lines.reduceByKey(add) - - fm_lines.pyprint() - mapped_lines.pyprint() + reduced_lines.pyprint() ssc.start() ssc.awaitTermination() diff --git a/python/lib/py4j-0.8.1-src.zip b/python/lib/py4j-0.8.1-src.zip index 2069a328d1f2e6a94df057c6a3930048ae3f3832..68d7267c733da88cfdc5d9b97e24327013ec766d 100644 GIT binary patch delta 11395 zcmV-}EPT_RrUI#^0wq@irbH3QdjUcN~0gr$25AEmGvr}M`!8eKFdWkD~m;x z?p?0yBKJSWdwcKFGD~mgqKfMKw2l_p-F+QR=4o}$4b=A{`r+MKRMlxI+f4Ill}|-k zO^QhIe2TcY0;vja6VSND3IaaPvj;I> z_W;M^Qlzz*Mga9hOrk&cqpx4S{HMLW!*o^O7v*8}8o=I*xmc{Dlk`sHX;vZ8_Nebk zGF#QFQY1-~EtUl!u*x$avPk9`l5$@4qx7~a=BrvHTw;%FPKx;)s0+QRRMn~YWhM6H zk9mJ_cL%*zKZ}a1E!WdxqAJw&dI>0%s!$~mr2bV*9z@O6s@3hXEGDATja?Q?k@us^ z)61c*u$GO~_dpb66jgXcFZ$8dTY#yGKZ*3A6tg{r*f$?zpzaa|S{5kN{7&?v`?N~y zy8OBy-KLeO>JqzU>ddk{ooD|cO1H7=bS8h^s83dL=tf-t@l@%rtZ$DnK!JuI4wq?}E+W~Rdtd+ELDMvpO`&Gz-}4UqBdSTd6iF_gKwjpeYYn9? zGa$W_oC2md&vnBz1m$`BxA;<$gRX%0*gtEs5GQ_2YC2Yss{$ ze`*Il6;+JCmSps#L1<9gV>)_4-*0dZS?xifbGZepZNa~xdUYPKkdWd>V^X? zn>v$KSpwk?N?0XfA#!VOK^c}tSb4-O`_zy2tNXMRgo58XK>=FvIDjl5h-U4`W|0Cb zKEcecy7;%XxLACEUu?dB5o2zcr>lH&f3GHN7rA`!5+{F&RR;U}eHJGA7t*O%*45zs z2Sd+kCK4D&K%*YK0X;<+wt7nqfZQ5?$8C&R1%#=^YM@jlT!Dyq95UCDZc<1@S zeud^P^7$G%fM}=X8V3Rkz&G9WQ;M1{Xpx|oO`{a$c3$KM@FJh4<&@_SY8NoPP#tvM zdsH@*m=q{Gn+fE!Xl&SKj4Cj#F3^3!`Y@sAG^?SI`s#F24=Hz3|0Hs3pteUT z=7d9}Y+9B$7@+NZs>?Kj_oR#he<)OO@Lz({b(%9HbziLJu!0I}%!m?%!B|%qN7S^a zQ3RsWK2}JK%f4R*U(VH}z zTO0`13`lVVMKU3_fqT+Ig0=)k3FZhh`mihuP!^AASgKXmf2G5J7}t+pgNFrm7QP}J)Vl& z)m;~bB$~N*nk~ulR2{WRt&|11#sD>e4rG3*Pkk_6r^TuspfyH6vYh_bqjQ4uXMrX+ zYnLoy%~+A~%T{bR7h=jHp;|4q3i!g4q!pu+5TpTG{#`^ZW=wD`f1Z*W)pL}Sge_QF zhfvXhX+Ot>AM|qe7W5-}mEgi|R+K9NR-bqjtdLU)(yOP@*c2Z}()biCBP7JJoU0?8 z*fU^K62iO=%vN)n{m~e#^@6b(%y2c6OO_g(g5^BP5eDmN^QJwP1|*X62}r&ouhT47 zf{mK6Ya0ByBL1c1e+PFI2BMVxcjSke6mL$4Z%=&XnqPi5B;9(WYHe_j6W{|J7hB0T z7=D%>?10}%DOS@_fdKRPA~_qLy#}*Uw!^|W5hSy?T<`tM5AAZTlNXYlFtu=O8Kdae zSAXrOiRN_5^t#_dA<5YSxW#Ih`Mgz$-$0GNC#%{<#+f2=e=b$q=2AZ*Jj|537WJ0d z1Wf46BV@!eL1~>*te$|TgmVT3h(I-nTeuaI6s8=N0b!lpaBPmr@ff3SKTFNlM(fzxMQtsx?QtBm*`B_h74VAx$K;1^Jw?+Z^V z@VnL}HU(VC0GSBI+gt|`L!xUzb6BK};dT}#ibr6EsJ`uh(iHjz z76O%1vj!+@540ozh_1(MDh*Ww)}_8g;!vAz%J5gvHVze5xR5xsm8V7zago6l2CNj? zREWCB7h&t@msL7vgOsL7U+!KVVjht><0BA-@Y^zqs6MKZTdTe zZqRgJu5?O}O83Oa$3;vw9SAMCAr_CoD8E7PcUGOso?XnkgnH`)Q4<102QJg|q7q&F zFYZ4YuKySO3M@k3D{{?q5ykds#ew~UTVTZ#@)WwDK1~EUM_e{nRS%;JQy0BvV}d>& zE8*wxe^6bnE!$Lrh#6lw0MuMGRIU13PgXap`xTgLPkER3`m`u$b{I>7B!#j#TvYlw zef8iUM%M-ebXeiBOP@VZvDb(A`um$va&_|4)ibC8!m+$f??fVZRV7b!eV4mg*bnz! z5R4KHUq?@Z8tSx9 zf6HT*Kcq16OhRQCwJqj8Tz=92Nuz0(s?gZVwZ22@8!y%A1TiI9w!|2Y3|4Ibx-M4H zLtZ?w%Nc%^%0zp2hFgSlfo2@9OtL3dYy!29!U@F#4el|8K-bN2T(wg!K>B^a^q_=)FcrY1hlR3e}ufO zvJ?nbSh1yx0)E^!s>x2P(INrk_M1$YYFXq}hAfPmXdrk}jD@g9t5Oz?rB*sqK{0e{ zTpB394=NDg%*+<76s(Ymj{iBB3iAFMXbX|%mD#H~Fun~^dnV`>=O}{3YF=kc45Fgu za6+<ECZK&4i|CMb z;ju-Kv6^){K8oGJw5^k};%F=O3WPH{V=4lDA*-=bMpJRj!*^SNB5 zluVzz;R6bX>9Yu?nL9x7>uQ_6|7JjY_lDJ#x`U6@n`EPG*~lL_6f#D{kc2j~(C`H_ zB1%oOh7fx`?B<5+7N{1m*tBpWW+@B<(wB%3HyScTX>82dvq{Y)$--733HQVoM;p!%?c$XWF4b;8Z1Z@tUi~6Z3AEfI|TllOF z=-P3|EQ<|&(;!0uU9V`4BG!bFFdy1oZFuqgSw-||){`W9p=wJV;#djch8EGl zs39)9Hc#~p3pF_q{p3 z)8vaA^zJnW*mh7jA3QNfp+x?u`|={dKmi;KKiGz-VokYmj(*rRt<_-$@%ZA0cV}w! zY_~J%DoRbdbvT6G%h41x?TJ*-C~gPFx&)QuRE8w2QuI8^!nY1%*cuwUg~wX`s+y~u z#F$t1F?-g=c0Vf7=zot;-%Y#3Uo%)t$@G)4P%3!3*(%YovAjDPVN|PdLy%pFL#nh} z(<*u+q1iN>J3X_)Y;=$0)(?iE?{P? zw0#?5Te>|^${Lf09U8X0wOEJdD(^2#gAS)Fn+uUP>aYxMTcq zWULFh3N%H$)#MRk=UIt&E0`r9Bb0=YD*?Aks#8?ZCJJL_w5m#)iE6`1D&~kZ)IoVc z9uOEB4Ehgip?}$bAnoTuDi5^Qnt?-0Il=8tI&RT@*{2F#OlgpNcGJJq<2cp>tV|5F zah1m>=ngP*#CHiO<3z2~7Rp`a4N74kdWs3P^8gtf&*G4#rqD2_jsmBd;{S=(qoIe= zQD6e3xAUyJ7t`mNj2l6Hd4o~y!T1(oX4#C;#An%sa(|b%3fhF|fO#x3T)#*?GtCiu z{B8!J?Bqc9R{mDo9!GDI)AQk|ufASi;8~l|)$r)7=fc^jas7^#71p?l7i(DKRz#M=%Kv?g@c$A6**(y3>~`*O!x;u@W7B%5rhz(8v610*6sZ#u zVNEydFv9!8gI6Ei>TWNNT%x(XkWr*PZz_CGe#?s0&y<6p$(R#tT7OQVc_KI%f;>NP z_YYRl#j;*|uw-X$1vKgGc95K+je1&q-C3n0VX?;LS{)dJZ&>@thrctrEixFZxP-y#hcG42I!s(bTb}J6E zxz)TRCsxfds&we9n}5Y3R?)RRciYp1v5n*C4YZdRPg?aQCjeKI`@ForiHqqQ43Oow zDrD3WUldLYpH$>)6e-gMO6jy4+y@iS&}O4@l&JzxTH#Lq;bTiW6Gu^u>wo~bA2cxb zgYn;P5fhd(Fb#%H4Sno9aM;{xk^|81Vb-x&a9hza=>~It<$s%D#avb1B@VyQZ%~

ZWQxgiKQ?HmI+5snm=1 zDr#hQuW<`2)qg|dYuK$CYM}uY2`JEmkTpMK`oh3qNbC&7h)E3bBImqX z){z0ok*_eIlP}dqpYblslRi~Qe>t#&5|KPhi%87TrDiUdh5}#;dr;Ky529cHYKZtC z)Sh&x>pNo3T|D{FFw57*AkGYo7DX#0M(yt}(Ch_Ew$4x* zuKWJ(s|oD{6RoO2W`faHO+=oSSz#x3VPF#&UbLYr5~$N+MJI1Sd3Y+bTDuY{dQ_ex ziCsh4q3V0=_6JN)Weh#Ge}Q;_ys>O@xq;X?T;~vg2R1Y38N*!Tpylx$k5iCwHXr?U z1;EZp0erC`@%fEMel3!B$=zw^&%&pvI6h0c*(w(Uq?O|t#F2J^ z-P!Ae(Ep-^2|1ixX~I6Jfqx3Yg70~J@ho6hWn1i0^?uZG#a^cuMb0)wof3F*vf`TC zY03!3RyUl^0V4O!Qv`N%SmGELK&M_+2M!{Yi~37TEA-V5cdu?2|)*0Ilf zP6yhhU`vxpS1ujF$nmp;TxI$a-)>TVkG3Hc-)p;(RBH@i+T0obELd2z0f-#MR@dW^h#z9 zz~3OzQL2+2SRa3{{+w9xdUP~8Jx(rxn5X}$=Qi?AtNSLb3)m${#%c^X(7HmasC)eO z==z!v4X9ccrYuIB5&Jt7J%svi!YmxSw>fLY?yH``f~KFl<^`4re_IxFa=3}kWVTf_ zAvMhY^l$jat3?9v@rh?1BWQjqfqAn@n z@X20FDgJ-sGfZ~eY*m7!vqg@LOq3MG(=n!2bJwf5YMWN` zu%%=caj6IJ%KzqHU?4_7ki!pQOupo5DHlfB^PYdT<^AsLT_&Cyes7eI{Ar9~V8xS) zOLt=Y{OcW1&}hkGA22rJkfL{_uv8vnlYYuGdAc*X8xAZloh3be2Gh0fxK1BK(>(?`2KP~@u9v4FAfEmXiKl$18h z??svMUW*6EnBweFJdK{H3f|LU0=AJ+F>OSF2VzvVAP>@vfCG z2NwOFv4T89=zUoU@7i>RF#^z9Quuu->8pPou1(P^6$pjg|2F9>O%eAn%ca!Pwq*5? zhef1+pUekOC^;KgZ5ve5NTC20TSJz}1lIvej;oxXi!{si!`$j8K{0VA zWsD`RQ)_SWtEd55WthMP!j8dp^UA9<6uBi`bgY^Nj!yV73}cCQS%%pum@=da@78~= zgl*^bjtSx=1N2a7LxNOs-MHJd$oaM-j7FuN@)9d0P+u;G5|M3-VrVI(s%LUfl4Q8D zJAlC`Xbf*Oad2Y4A!XmeG&`k34u_;1*d{)um_r5Z8*NKWixD){?Tv|FFgYWbWSq)` zq3DtpIIC7{(iBB?35M)JF`H@TqUe8-JQFI#cu!M;d2pTE7F3_q-!0qoty}Z% zISq^pKAjmv$4G;5-^zn60nnK_WOOGN908f~=`I6Ar|+i`R6F?LGhak#KJ$M?y&Pl5 zC+a3KlQ9P~&aLk5DOta1NoP_o@(!LhrWC~UqIg&>Ww-@hMzGc=7NnI-n}|6@gBxHf zLzLcp>mhCQSu_bxLjvp8wUwpk~>aRb8};DW5$JeQ|{spS6_!ZNu$(Jxfi zH; zP+2J_w+*+d-Z*O+IHc(7S+*X<1Ad7!=Mq$+ZqJW1YRoNEp8R*tK89}^hPGoH=A)bE z%-tAqSa|~&HwO9i{PKTh1p52=yW!Pnc(p;B-2??uH^^*pYWuN-`c&vMjc2#Gr6S14 z4H>Ez%x2lx=W~JKC2a%k^k{Pg3~cuynoq%aC)da5j+r&v0%oN7+I5Vou;dK@2o~%% z__Fs5!_?Wezo};0%a`V3u~OHEfdxtTB$B>Kw=zAc`#NEvIWh%A zUad-DR$Q`nC?0>7Z&h5Rc$6up+q9^HHrUZ*fc4bUX|05xm_nVJ?vhr9i#T*u10z-9RzUHrD0|H%oKwtS(13uYW9OMy zmrsHQqI#8!=6T3fp8bLWMtpDyxFecopl9UJcdX5hSvP+vqvNcMx@7a`++(9!Qc1fS(Z%f#-`MyclwS{u}_-c4G8YWbB+b&3B%a_F8 z-zMzM%2WQ_ZPRAVdw97E2lkpZFdScCImX>w>iAJjc#15Nv)xWG|J4-4eUAER)i&p8 z4(N;KkZ&#Ioiq~;NJ1BifiiB)5j)#Ix?k+`cwUH<+4Gg+&eBS;T{*S9Fe!8UoI(N_ zlh0olf5Rd$U6Cmejj7iAS0CQP=ip4T`*lR*`e$?NuXR;)tP6Z=(o|>?w4S8LU>s|( zP>X%J#RrPgn$VVc{#ho}5vCBXN$P5Volu;^(bd(FpD(=Wn~qdDp2)lGWCmukcQb;U z8%d*ytA#bp4qzQb5xC_?*J*b1kAvSc&f>EKe}$c6LS12vW1Hy3KjeVAHnv)AO*1be zuuPL;Q%;q1RVjyi<(Jqk^6fZ~gw93u(014`gW6^z?_1cpOf36c9uS6ZkvO29b)ngH z#8W~vAnuH^Ro@`F3;H9a22p1LYVL1 z*|pORL^NNBcQ)~W?^NpWu8`mg&e;XHC zp*DfLvr>Q|WjorvRziH`+->Hb@5SN27xSeY{KphDo@I(Rhvafs4~PVUk?a~iVgsh? za-N0~0iW;}lywLb0Kqt=x0DoKTB;ben>SfR-y8{Z$|3`l*476pm>fiEgO`)v(BqE$ z3c-Cd*|Eb26W1NcUwW@+md$I*e<~`}esPQK(E(lYt2af_q0C)Ie>_{m%Rc^P{rfuQ zAXJ*Pw{+SC-UhagG?{5xW>V8m8~GFHvJJ;?q4x@1+pChR4k5)alzfx}CwG_RW@#-i z3$(WL+&kS=du^}+<{f>qx`EZrh+&o=Rhz=Mt#F7(i_BCQI-X&8kxCCu9cEh1)G%-Qo@hE_GQ( z`2W-LWt~XIg>#;+^2xn6Mkn)1<|uxNYE%iMeLvXW@AJ~9f8lCeEbD6U{)4NEPcxB# zs*RUDg>>S#)S4I4rF{J|BP7K@Q9u)^A?R54VoU;e7}?gOav@uvf4?2O8{3BBp@51; z)?+48Q*T6}jZ!-=xTtr4y;_P1FG4k`gSf0!gW`!vR_7FHC|%cpO@a+`J_t@5cT8=cl1b|^ES%b!}gX&Hq8 z^_V#i^)Xjm|HY@eOrv6{uLYy)tyBm{*J;unMb@w}Tt$!P%AB>)mBvP$;k*Z9g~&MW z`8Znhq+x*(LM9V6PY6<4pcbj`u4q6r!QB5rr4eq$$K#`if6Th@m{5>z{B&WGU-OiNhxfnyyb#eiqg7UPG;@c9!5#-n8u-s=LAQ#n|-qmciX5!B>tK zEl!pd3|7~z$$CaF*2CjDxt?`qr8O!2)@1f3zTEj>e{-ypxbQ`UcUr2IkmBR2Vvg(n zq|7Kl507Uce^L~leIEc_t7Qi#;87LaPcEi{wzAchnlSl_$@x)|xklm|MC05!V96O= zOw8ycU10`S)JLWwopY2x>~;>X(!?V+bUSA=ySL6C-AwMiJt@6A$Zq;$voe{~#kTKV z4z+bLe`C4IyS=#KuVY!Ae5o4;9B}Zf#=k&(>noct-X>Vv0hU}1Z?1S zWR^7!0RWchg-SP_;K)3!ylR9NWp1F$h21;vfApeeiNz@L^Lg8TRPtrgwY}afkf@F0 z=*R};i&Fb!hjeKoa&m9O3j+4_s?XDo>_%;9Iu72VTsu$m#*-2)Eoq%Yk|n@zSw1Ve z9YvZ*(uPKj(;qhgyOW6rw)>Z0!e$08FLyUA0E7MO%fD{P($LJ_?X-KxVgtPf-Coa5 ze;0=ja%e(f7qFT%YkzRbNfXw-9Y7H-lmR21;;Ec|ggM6eFPi_9Gau@s)0hA0Y9#uw zqS60ad>fHIk*>7M;A(3D68mwkms0%d--OB9CN`;d` z7j7y@VH~$jOihW|j_lUJ!rmg>a6e|~*>Iz$|FowVR*!eTi48k&zeKpUfB9FV>efB* z?F4iLk)7=QDa~jBq9iLeZ9I_|CdKB2^qXKC<<5?(@z=~^`Mf2QZkd}qdkay+e+-m2 zva~L*s1sur#`)US8C7(vZSqM>_1~LU6BR>o0{ykK?|_fYXs*UXCF-;&-{hU=Pn|8(rHMF-!5G!OMpT6yLrPl|{G_|-c{MCa%UYsy z3b&DE+syPIPv8|=4P%dLj2TI1+n&+Ku{ zr=+p-zmpxlN!<=l!ed)rq|>#1=uh6bT5Y;ln`IhKz9ui54XLKSMkKH zkDP+yvmrR$zEbzwDAe%uy>{_nHjQ6jwW*%?xHr`FC3?N)q{Nh(jJixYe}w*2jEv>C z>gttHbgX*9`TgmF14&ky0-jxC_SW5|{JV)9Rdau=e#eEr z`4%30Xjjwf%6CUsuaCYPLOWZCv7376r$*GtdJ@7sE^+O1;HuXlW>%8xGlRxD4 zg=u88v|eGf?TRdOYCT5~e{<{njF*)*9=<(U*UOQ7RaP628;1*Z13|G9#Z0R39v$&e>4Nk6xqnoSqgHZP?e+JkJ%Fw5?-mqgI z*nqcju`jA{72&Oy@ivuWzT^ry)QP%fW)`2sWTj45gf4_^JRRw-rxUqeOx!hU2^O=u zN&nQ%;)eUI5n}H1xlWI$<&DNGB%aA2(i?~s|3KmWF5TGgABYa1F{FAs+)Tr`Dua-I zxdlE0aY;Z1t<^#Qe@Jv7BaByMjDduxW1zqN6P+PfnMU+>OG|{`5mi~U(3IoMXb=br z7+cs$k}1e)mQJjV8U^E?nNXcjlW@>|H02a0V>UWkWwdmD7?&z zDyyQdya`FhmnmQx76pe%j4L|{ng~$Q>20;TtwgQj)~4B{e>RXbgN;d!fD9S;qcJXx z>UR%ckc|Kw&?WkqSJG@4n`!u#iIH*K0wf9Vbl z=XEYV$vZz@nu(4*)XPgF;P+@s7fCXUNIoPgG`8mKC{|rOolf0ax{^MM#fK{GYgd5p zib=-VIn|L;e-$lip|+EiOucpYPj!OghB__j6EJiY<^DK`UNyfWxsh$4DVfnseU>t! zktH(&d819U2VqAmZR(AE%A6X7s`kwX(3|;Zg4UX$hUFfe^nCY3 zWl_~z5G?=z-21)<` delta 11422 zcmV;PEMe2BrUIU(00cU>`Z14y=A^nmgq!@b*w}=oAvsE5ZRPqkpKgL zlDT>Fzi%CVqd`)RGkaloViD-BuI{d`tE;PDLtM8-Y_M@286}*ko=!f(%joxKty-MfNSvtATauLnSViBc# zm+QL7{g3h9-n+ES(%ZSHqWV6qqeXUiUq_R9THSL4^}UFGcy|_6by~_c(>z+`Q&Cov zq7-{wYV|x_0MgNMkw1#O&Y-Dj5#>c4m10sX79xM2BJQm~s)E}DG;XnifRFR+LCn`Z z!11^gX)UG^Ks^zY=+FJ=>z6P8X>ac^UDfwRc^JJ0u=ipv7VGFFy%TwwRXAvS)b}Kr zt?E@Nk|fF&%K{Ks1|caSG7pE#2(k26!STZF7&2SRj1;YmDrO% z=EZ;A9rRlLEGn+HTu+OMs!-SKC7@KQLX|*}`d2Y|5H(k;R=3Ntn21U@c3CV%-j6O% zFNeCqS~gPO!=NCcsKO(9(T}d)0!&r>Nu&>@nC&UVzWEr#=q_QRWq~ry??gYkPph=9 z%dh*kkOr#gd^MkcpXSrKD7$iqWBI#xxEH}Q6|+bzvbqbSIqMURl~ef*pCy3V+sgq? za?cf`fdjU;rvTrH`Yrqk@AeVD{a&0-r>d9@fvXUtmb&prSP97;km^&q-tX=0y}o~W z^JaLJT%Z2i5Ndt%@}K^G?_~Jq=;rMxIXXGH8eU(+>&~mM{~>+}|M#k+zg}Kkjo{PQ zfBEyD|7>3VJo^6PJh{BO_}8E965r?&$4779zCJqsF}WUIy@Q62j*o|zqhxe?HoUmO z;(yt@{P~+7lItH&FO%bow{M5XqtkzjbHXC{p~K}zS2x&!O4Qx=owcYs{iw6X|J3xK zawR$+pu2lGEi?u^x!sSCi{vpAPklMt2@sZ?C0L!O^sXWfvrH!suu7t$jF<+hrI=*1 z3>N+EIvShmW8`#DgMa}12;oP0x)2q>K!%1N4wq?}E+W;g-6ZaF9M&=6i3B`a(zdIfN1k-nR zc65G{oFAPHqd(&75CAq$N?S1`s2TjvZHiepApQOW6Fsys%jVb(;yS;+{3|e?azCCG6#Ui+3XB!c1MmWZXkK?g8SL-(nVINcI8Mc~t_JTv znDLxuB7u1XH0r?{&{KpVtGCnu@U8K8+{TzyKv-I=21-@JWr&E!AyXacCWSXqi zS7`1cpRb_*Hb^+51)j{XI zM^yvyL4`(vNnvDXGl7&AjSbn1Q3WRH0^Jv^4-?Kj_oR#he<)OO@Lz({b(%9XbziLJAVGySXEYLo!B|%qN7S^a zkp-gCK2}JKz z(L;sl63h`M^kG>Rpe!EKv|wEl3R(gHy#Y7ZoLoWDNWC@W z!=2?dZwv><%h26GAyN(0u!WJya zAyjl=+0Rk%gI>oS zo&k{(6J|LuTg_?pM`N(o3&v(JLun>OmYSS``8@Fv2J30_rak5c#FFy~aK0k1(=1nv zjhe7)8vMB;{-yW_e|HonqLlr2_=i~(Z%&7APkiN?Uw$_v-Fl;HZE%khpaYy2Tgf&U zewH5WfZs_eR?AU=0Q2}FIUAn62D4GN!^}7l#Iv|u@BPaU?R>417jigZY2n;5M$xaY z{@PIs&FPfsb-!hVBxMVr7OP#R^HwE(!)Ww9Uez`;&JvMpf2rEGmiiIlVV2ajsJE;p zAVOyyAt8Xz0{GLsNT){V{!Nw~pl;Wpesgg))Sw9WLMe-;8C;EMKSN~?amCY{|3y!>9elZU=hESNBoas5nq%s?9LSM3ni&dfy72&@p*x1^YLWuZ`8q+(kh(tMhsL#V zK;_h`fswT*S`GlPuE%UD4OIiy9es(!p*G!=;jf@=oGPqvA#rFcPmLa;kUS#>IVb}{P`>a7bzEeK#8D5mE{CA#=u z+HsJUpUTJ^V{tZr8KD=^od@-FZ7X;#qeFqQ;P3T1J+sPuFC z>cKyZt_=w2u)<@PK6{{IuMhF{_cx>D>g1=ZXN(3g$MQD46N%hal{C@yUG7G(AMU*% z7$q9Mj-CYKW)1>p+UZFF+1QN)QeA*no9LZ?%Wky*Z7G=w-1+qy#Vx;5z=Dqc=#cKF zD&%wkTnxI3{TNLDFaV`&bcMThr{|;O;`I;1<56;YLL%CO2SNhmUJfGyLe+T6>{-EQ zli~)t*#$J_s-6vlr|proPlj)s`s>a>tXbN*4ydmJG}xWX8-1^x?(o?}dvd6uPW!Td zJZAnw3KOp+REAO8V(LTji~diVO}kWu##XNN9a7&|RHqZflz7<^V>mL9+5mK2tfGg! zcw(0`{3?}+_U;U~2gsW)B(mQf^%^Y>~+VUm~R~T;2nTg-RbG>qmN28DmL^xS37Hf*9 z=oYq+CX#~kS)C*lg3t&(P^HodsmU@<;dTz7U`Ph9S}Vwnugy@CL|_omw#E~G^0LZO zz*s?IOBV(FxNTIEomR6&494v@nK0F|$g2!V7&p;?@uV0FVa-;hEE-F#be4i5=+wA0 zkbfUkz`&W9Em$d7Arl?{b1)U;{WZ`QBF!taS94%|8>IG3&@IkU1dG+Y&XyQNMa|)Y zWF2{d8%}qE+p2I=2ptdTm}jqlx*&|?)+DSafYsc=AxJ-==|k9(sE#I}eHDx7kagj) zH6UZP>U4Y*yMt+4CuPOaR_qlBXLiO^1o}c&W21~7#W7FcZ2^kFeflVj5lF#gvlP|q z+f^-httjq5`lYM(VY6nbFJ0cy9~`f|&QfWf@^%xlg4=ZRP{o$#rm>2DKsRMMJn*7q z`s58CP&iDVSum~K0kU6L+w}c61KPVcq^{H*e5Bqa8)eHz{=lh_F(PJ2Xfq29Uoa!0 z)HG`bV$X-&+;H6j)dCiq7EZ)0g=xU?r9p@r4Vgh{Y|PoKNv$MF!d4&&_rz!8%9dZ# z+VEtLmT(px{`ep2z0U_3A(qzupmKjyTE(?TbUgur)A5J0R&WxLaFYQ}6n_WJ1FVM? zVe21mA-~U7V2XO>R-hn4C`JFOJxA74gsYpl>|Jg+Hcf~zRoeZ?QCjGt+J7$91La*{IZkX)^B=oq#WQ#t^$_&bd4#(2qs!U|z^z~y zQweopaSeFjv-F+Ht^Ij66U=O+^GwCg+>xJAI!|f}b}4K~9J-<+7=N&I9Ar-c+V|G@ zPKz&YjCZd&!M20K`QWiR3MKMq-RBnp0t(<@=)pEb6=}+ibM(WmNmhpm#G{KJ-kqu0 zv)#_5t0*<;)}at~FGo|*v?o$QqqrTI>k?FsQyG%9O40Ku3*S0~VQXmY79MN$t7=L) zi7`v|F?-g=c0Vf7=zot;-%YzjUo#M1>2Y?a2bA>JL0FsfC!A@DB5AywL~ zX%)SZ&}^E`ot{}?Ho8Z0>j#Eaz;c_K&41k|S%N!Zd2R#Uv1b0uERT|H5tO9%N@?B8 za)>$1R5!52%?l)($?0jxh!-T0z!aN!OVjHDFR`u2yRP2=oPXif)y0*WR~@m+1l79*kiD4A=sZIWwZxrS zjCCQUK$FE=O&%e3o|SmFf>i=MLU9Nw3Aj~KouYy^Q5Z9$RaMeVR2xoGF-N4K4)P1~ zfWXvX(0^D9&42y_$9^uP@<6iIOdMLu32t}Naf|NDK2`8yN`ut1oBpL9$FUwDWg?)B zQXZe6JHV_F-zA`o6D6lDl)K6sl)^;x6cK9Y0TMW##UV{ip&a>)XOrK{mZUpt^4Mw#G<6DH8Wivt(pJf-yU4PyxXcM9X=CR07evx`+nj`Y~ z-3&t6$${*x{H?Y)?W58&e}LQ z^gSnu#?!HKF23-M%H)Q8|S_m?-s{?s6hMr{975f z4faJ?eU@=+ELt1b=NPw^MQelFXxu_O-JnCtKP}@&fhkv%!okdivYpJhF33UEzQ(sw zjh=MH(*ZsVa~C(UPcD8sPrf@E4SzavH%?=aL%8ePS=FOjRtg4a=`PDq5c+^oY0?+*`NeQ>M0y*P3Q&FzJZBJFug z;d}C1R;+%e90X0qoM6*`ath7UfP*2(>jQWHK#DGw_1c3aJ98_bNoTi%_!Mo_lkjy{ zm5zkP9%o>EVMJ`F(=?x2eR23SVzuO85aAGiuCghG77*t9o>*pMp7Gd~jkNh68d^Yy zP?DNP#pW(Xi19?B?c3Q&OV|phW473>ILzi&^OBrcHOHvZp|5U#7K>O#*Y@0PPZP#A zj-xlwUS2$D)t8(ATutut^8O|Y(>E9(%WqZ4sKvf0oEAQ*$k!-RrVEtPX*akJCZ3_q zM&&3|1)#LTo&3Yc7I!9!q8Qf!0dPNPVC)Cuzuh7xEN5UE44WGI*m>Zvxz!{Gpx?u) zW47S7qGQqx=KRWkH^Yj#s=P}aexu)@GSwhdT40z0-VPh!St*|Z{Aur;z^bFq=DamN zcwoKx7F`(dmWTj8HXMQ^CQaNYlyL$+iLx3Rd#F+{a@EE{_Y`s2JQPg;5?fs)o}_sq ze_{K9HvQpZ%2U-%)qV(>CR1#TzS^ZyFWRf9nc2O@El8?=hsM{iTQ$@|19-SabXIFtQGq48+4a9`0*e6TmThZG@Mtb=jTa85Uf!5RY-q6u!9nj zJWGp6%+aN0E|-P^UOlSx0KqjDNbC}tkU!U~v&5mYByt>q(^x#tmUYd6>v ze}Mb|D6R`1{8l7G363wXZFql!dCVMj+Y6wgvv1_UaOtO^zIA;f4Jbn0tm1Edht0Xd zlMf9ue{D?S%)n?-v_fpu{{8~ZUa(~A47uUD@9(~v&`vPXsv3AE7;V)= zHi79y8@eKfIxSXo@&=TLr!uRxE0Ll{p~rtV5KoXdmrW`+ z5F3ZRI)$4@N|Du@*Ih!ak^he+t2Z?|FRj zEMQkf)O0RkvCRnq9Nr3+1e_8HohevCn%>2im1zbCXI}E+;ufWYW8!vYC-U)#;p! zrgy_m@V*01`kJhoR??=7Y<(F1)ub*;;*+0ODgm^U%2y*XF!-zC8=3xcBbUcw#N;J= z+?HI?bosZldG+CQxV+81&^sVmc=T=bN@fng-@wt4tCJpBAAd`KPONx6IvSlGCzmjo zr~j(gHquV3`zEXl*d>U^Y79Bhxd00TJPE%VJIrH_@5Qwu&ZXCl0nLw{7gfqIP$8W3{+|f*zlq?{u{` znk)~`#rgH<>VM{VbaA!An*4&E9$o1-s9(QH8RB&ncKG1zG|n5eI%h69k8M+!$`6h> zr9?C&OgO|qX7jZKE5284QzN4C&*-(uMcv?kYsvUoBC$_7cR{i7;taoxd;~Vg;L$8< zDyR%hmKa1TYLXHLpX{}i;y*sac+&g5=3ki2wu^z@P4 zDAJ3{VwTnRZR-%*Vnt3eFMh>C66R*l)9O_NlI1un8V|5c?X9j5Un6d1`KO5(4URii z2QDzY8lE2yNoT~Yr@DUZc`E&QmFt4fq|=#eNq-iWrPwWW7CxLPocEqSn(cuiSDTLI zi+yjO0zRRnvRMxHe#Uzx9vovzvq$hWdZH?L&xQ%OMnc834+S1*QQ3YxNHYrNxLeKB zl4F>pL=X7^tsx&+ufIbiW|m9uk^X1Tt+2Lqy) zVC7(#TKyy_rp=^$vDkHL?G1hvH9)Hj5hx(+7+N-jDo)KMiK`n_WM!x988l_I>X@%DF>#BPbua=0sBVV64PP?O?7)? zq8CiA2qqb)B4H@Hqy^5Z6`M3eQC)%|dr-_~nz$(XBhQ4&(?`ioidrV>fUmPvrGGO{ z?5+n$$!}$#WQV`A@|Q%`xR3j3O{u8|tryz36^UsjDs@Q05%RJHd18ixTGTs%_bert z$JV(mLG?-f-Lk#jy0z|}v%t9C)0shZjAJnFTY0P{06O!Agzn^WBOp^gon?UN^!qe} zY6m`iW{U=z&umdI$Jp_Sx(Uo=ynn%rajUy~O15uW(wWeUyo0BODTVO7C>~Zz87@JW z39R*r18F7GCSXp{+y+hvI3jE+<`uI$tC6D&Q#*g zpDfroGNLY|BdxN+v#Pu`LNm5BS5~klTE+UPB0Ie7F_H8w?@1~Pm=6&9q#2+LL6B~e zvZ_W>lT;*}j14AVnSwzJLw|Oo3J!?jCyl98Aph~>fRQMM3g zfO1ypdDl#Y=i7ia;3RxXAZ4YD+&0~+dgH8Q;E|EV+!*51^UIqN=zQ9)TcV1X*|2dEfqmVUdT|pU^dFeKAj5;FIgLCr$?J3U|_os(R>QVE4e;Kch0QU z7BC~t*REq!g*k5kK(Ju9L6^N}7^cp)yrq@d6|JaBf@$@`+XfFG)P^P4#l(bt%!>hk0RxC8-Es6&;~o23?NS}oz}X^ z5Fbio&(Se-rJz~r--#SAxIxzy>8;Z2UGN3B^2#c5g(=jj=`LwyD8!+w8W^b(w*rc1 zMcHeLa840_K@$R&j-4l7T^tS&h4$L(&FdQFXImX>w==f1Bc!~^? ztKCiz|J4-OeUAER)wbqo4d{#FkZ$CiG7}CcLKljGGJkHY5j$Hyx?k+`bY6&)+4Gg` z&eBS;UAeTpFeh_+oI(N#dyX!@VrIQ{tJ|Iu%O-gLs{i$ev2;0<_ z@8NTBCE5KtqT%{yb8D}4)pM*1d~4E7=pkr536H@v)?}d;`*Mp96uC8_E%WpkayY146J1DW&=03kwz1xg*D9%U>!sexTQze zX?F9EgWogG;Df$pnslqq1koBSGTirATyG;$fFjO&7aBk z?1!5*Urfv9b+|$7tbV!e>jr&A*w*D+3wp3h%yS=k<_4@v~ zuChfj&l&k(7_w5zfthb`!EBU_dG&hcXA9t6p0T;1Dqul`FyFzmYo`l{XuS~aY@z|*spMsl zkO#_x_=Nl&J8&@TESGZ|l{+VA>4VJX|N{jZEPOHG%z}Ar-W)jOh z)U?w^{sg)#!|_|_y+YUas-)B*q<{E@;*WCTyG0f7VYE$^O6%O%e4Ko#nj%OHNq|yK)7?a#d-(l+aNH%1E@qplE@s+*;{qXJO4ki^oR8)H2x10_%4`9;>H`@?B%1^_@l4Kt^uYcTH#n9%4 zTCK(zKEK64qW~slmud&Yii02hn~={X{+`DRLMTNjvK>722% zGx*_BPH3Ti-suMKI4K^elzhccs0^$Nw_Ccp#T^bz>avXR|0nTfohU}ZIZs#lWnB&4e{fatX(kd-wehm1kWT!TTJu7>l&@cA zbfg$43TPrV1Rcv>j7i`QBHNl&E@bQTw_|r>+l+W9prVnrm|-DYULuj=wuDM*`OOdb6`H+3xqoWx0+}ZhzBwp7 zfvg}!jI>h>00U(61Jul>X*rGN#e_4R<5B3D(mx3=D~~m=L73bbDRFf)uGUqINshBm zW87+CMG?Jx`t~ZfX$R6OpNg{4X-#2=GIP27sg;|SSqO|C6X&5mri$`ke5%VdDwg_M zFS?#eg>ZD8Cf!+N4SyTMRrGwW%vl>8OP&RZ~6h>YW&kE69t8WtEKWHL$ff*_>@ zYLWWxvIaC0%>5ly8sS!ayghoztP6_?1?fgkKWia*^2!R-v_mDw(tHf-Cn}V}V?J~- zjUs-FDA{8(8ue>@i#-HGPriPwn(IbM@2ZR?^OjThaaRe4D`Z&apH@4A3etg3*$rdhL0 zno)963EL@exgeIJwC;wJSvR(aQIb2Raj4=s!I3Sw?l*@#VzY`#7jtKrQP~GgoI>j4 zx=@i^xaPh4Z-2H}xX;O4gcE8dc?#rt^+ijNWNH5t8$FLyrJ6ze1&d=cTDlu8m(d|Xw`QSML5i~{uVcm~oZMd8``{OMXO zJ2(N4s^ETdF%`6xt+v#J$u~^SkCMzaV%NYL=gt93&VRsSVn!$F3Nx^xJ~9>QoTCI{ zw{v(ECLXb&+clfnxpn^MW^(TBN#)&1cGDLd$z)Ow+rD=>)z-y|I(J`f%>(STc${{@3E$i1dkcrClc1!>f10 ztAFI^_;`3ZN=Bz=!;71d*GYU-V$_XD{;Q3 z9Y_XFxaP~15g!`Zh6_aO$GIL#(W`%xC2O11Q2StqI<&cPQ$Y&jxNTwTk(ljBZVfE# zEy4}=TZWzuH*)$ZumbkgIw-7bVKzXA|t9V777_%tO*RIZ}qFZf~ zPhzV7-YiX248;lb*BakV%OxPg8g~)}FUhg_(YC8h+RFy@UiZ8MJ~AV@nh%v|(|;QI zCha_b>1Oq~fGBQ1ateyihTwGjO5IPRP{Y&r z+Qoy}G=6>6rh4Mz-Z0ab==GY55>sL_>N4dJ`cpA7mfxzYQ$o?P>IvuPrwa>9JT?l$GmP2{YaE66bw5rsEN*N)0fve>4+2vk3Q!a3Am|@@+ zRWc7(w8>=YA3n3Fn(qBgu78H3o2&EWY>oPaY+S@J--9Ia}AdeC4$b-m# zTSay*Y<|^kS!B)JcewfAq+WdkQ1>ea*b2&wPiMVh$3CzDZ{uQLG~p`3TQAdXD#d(B z6;h}Zb<4yoK8eXnovsL7=-7BV(p^s{a=n^3_|+l7WfRrV%qvgYKg#r#Kn2Ie((>-=a?n%Gk6umJxb{lsef%;bmS_Srv8VO-3@lOaardC^$@F zT-iy`L|_!1-d3yIO4KTDZJJGL14$FunD_|rka0g6qi9sWd-#HE1fYN}jgNUH&4#f_ z0?DitKVl(NafmyNw_;pLlkotp$}K9qf4+9prU~_z?!a(f=YQgpytCt_S?Jh9EnXS{ zzeiKL2$D%e(jkpPV{6WiV%5dd>C~;IE9s+He5lgCHU;Rem}H!tQynQ)(V`Y=J6Xxp zTX+9dCm3$1({g+QhOQd9KMtZ-&96AzNH)-v%xI=QOCHh4l9>U&(Wcphu%neW^~OGB zPK`oU`{n}Z&42teL2J!W!*#;oup-0GY_NZQD@kW{yQfBpdAg)W&j4mNaHe$5rL z15ABA76R;HB*W{p$|kgX3T%qDO85AOD3fAU!@^N{CBQ^>^8+$(kTFfe+AWtuL^4#)1uXyoq?2`#E(FXz sxKfjVjUSW7ks1NFli`t70bi3hl3f8}la-QG0l|~%k~s$ciU0rr05{ZGF8}}l diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index cea7d0975e5d1..671c0d426677a 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -76,7 +76,7 @@ def run(self): EchoOutputThread(proc.stdout).start() # Connect to the gateway - gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False) + gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False, start_callback_server=True) # Import the classes used by PySpark java_import(gateway.jvm, "org.apache.spark.SparkConf") diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 08de8dbe9d542..0ba2b4b38a281 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -43,14 +43,6 @@ def print_(self): #hack to call print function in DStream getattr(self._jdstream, "print")() - def pyprint(self): - """ - Print the first ten elements of each RDD generated in this DStream. This is an output - operator, so this DStream will be registered as an output stream and there materialized. - - """ - self._jdstream.pyprint() - def filter(self, f): """ Return DStream containing only the elements that satisfy predicate. @@ -190,6 +182,38 @@ def getNumPartitions(self): # TODO: remove hardcoding. RDD has NumPartitions but DStream does not have. return 2 + def foreachRDD(self, func): + """ + """ + from utils import RDDFunction + wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func) + self.ctx._jvm.PythonForeachDStream(self._jdstream.dstream(), wrapped_func) + + def pyprint(self): + """ + Print the first ten elements of each RDD generated in this DStream. This is an output + operator, so this DStream will be registered as an output stream and there materialized. + + """ + def takeAndPrint(rdd, time): + taken = rdd.take(11) + print "-------------------------------------------" + print "Time: %s" % (str(time)) + print "-------------------------------------------" + for record in taken[:10]: + print record + if len(taken) > 10: + print "..." + print + + self.foreachRDD(takeAndPrint) + + + #def transform(self, func): + # from utils import RDDFunction + # wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func) + # jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream + # return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): @@ -209,7 +233,6 @@ def pipeline_func(split, iterator): self._prev_jdstream = prev._prev_jdstream # maintain the pipeline self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer self.is_cached = False - self.is_checkpointed = False self._ssc = prev._ssc self.ctx = prev.ctx self.prev = prev @@ -246,4 +269,5 @@ def _jdstream(self): return self._jdstream_val def _is_pipelinable(self): - return not (self.is_cached or self.is_checkpointed) + return not (self.is_cached) + diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/utils.py index b1fa1e227b0a1..84f1dadeba03d 100644 --- a/python/pyspark/streaming/utils.py +++ b/python/pyspark/streaming/utils.py @@ -15,6 +15,27 @@ # limitations under the License. # +from pyspark.rdd import RDD + +class RDDFunction(): + def __init__(self, ctx, jrdd_deserializer, func): + self.ctx = ctx + self.deserializer = jrdd_deserializer + self.func = func + + def call(self, jrdd, time): + # Wrap JavaRDD into python's RDD class + rdd = RDD(jrdd, self.ctx, self.deserializer) + # Call user defined RDD function + self.func(rdd, time) + + def __str__(self): + return "%s, %s" % (str(self.deserializer), str(self.func)) + + class Java: + implements = ['org.apache.spark.streaming.api.python.PythonRDDFunction'] + + def msDurationToString(ms): """ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index a2b9d581f609c..a6184de4e83c1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -54,14 +54,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T dstream.print() } - /** - * Print the first ten elements of each PythonRDD generated in the PythonDStream. This is an output - * operator, so this PythonDStream will be registered as an output stream and there materialized. - * This function is for PythonAPI. - */ - //TODO move this function to PythonDStream - def pyprint() = dstream.pyprint() - /** * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 05ccc23e9f422..751b7504f1cea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -56,6 +56,10 @@ class PythonDStream[T: ClassTag]( } } + def foreachRDD(foreachFunc: PythonRDDFunction) { + new PythonForeachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() + } + val asJavaDStream = JavaDStream.fromDStream(this) } @@ -85,5 +89,39 @@ DStream[Array[Byte]](prev.ssc){ case None => None } } + + val asJavaDStream = JavaDStream.fromDStream(this) +} + +class PythonForeachDStream( + prev: DStream[Array[Byte]], + foreachFunction: PythonRDDFunction + ) extends ForEachDStream[Array[Byte]]( + prev, + (rdd: RDD[Array[Byte]], time: Time) => { + foreachFunction.call(rdd.toJavaRDD(), time.milliseconds) + } + ) { + + this.register() +} +/* +This does not work. Ignore this for now. -TD +class PythonTransformedDStream( + prev: DStream[Array[Byte]], + transformFunction: PythonRDDFunction + ) extends DStream[Array[Byte]](prev.ssc) { + + override def dependencies = List(prev) + + override def slideDuration: Duration = prev.slideDuration + + override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { + prev.getOrCompute(validTime).map(rdd => { + transformFunction.call(rdd.toJavaRDD(), validTime.milliseconds).rdd + }) + } + val asJavaDStream = JavaDStream.fromDStream(this) } +*/ \ No newline at end of file diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index f539bc9aa147d..d8dbdf59e7ff1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -623,66 +623,6 @@ abstract class DStream[T: ClassTag] ( new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } -//TODO: move pyprint to PythonDStream and executed by py4j call back function - /** - * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output - * operator, so this PythonDStream will be registered as an output stream and there materialized. - * Since serialized Python object is readable by Python, pyprint writes out binary data to - * temporary file and run python script to deserialized and print the first ten elements - * - * Currently call python script directly. We should avoid this - */ - private[streaming] def pyprint() { - def foreachFunc = (rdd: RDD[T], time: Time) => { - val iter = rdd.take(11).iterator - - // Generate a temporary file - val prefix = "spark" - val suffix = ".tmp" - val tempFile = File.createTempFile(prefix, suffix) - val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath)) - // Write out serialized python object to temporary file - PythonRDD.writeIteratorToStream(iter, tempFileStream) - tempFileStream.close() - - // pythonExec should be passed from python. Move pyprint to PythonDStream - val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") - - val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") - // Call python script to deserialize and print result in stdout - val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath) - val workerEnv = pb.environment() - - // envVars also should be pass from python - val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH") - workerEnv.put("PYTHONPATH", pythonPath) - val worker = pb.start() - val is = worker.getInputStream() - val isr = new InputStreamReader(is) - val br = new BufferedReader(isr) - - println ("-------------------------------------------") - println ("Time: " + time) - println ("-------------------------------------------") - - // Print values which is from python std out - var line = "" - breakable { - while (true) { - line = br.readLine() - if (line == null) break() - println(line) - } - } - // Delete temporary file - tempFile.delete() - println() - - } - new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() - } - - /** * Return a new DStream in which each RDD contains all the elements in seen in a * sliding window of time over this DStream. The new DStream generates RDDs with From e185338e1b13d92f66356c0a966e5b5c59e69f0c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 1 Aug 2014 14:39:18 -0700 Subject: [PATCH 2/3] Added missing file --- .../spark/streaming/api/python/PythonRDDFunction.java | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java new file mode 100644 index 0000000000000..88f7036c3a05b --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java @@ -0,0 +1,8 @@ +package org.apache.spark.streaming.api.python; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.streaming.Time; + +public interface PythonRDDFunction { + JavaRDD call(JavaRDD rdd, long time); +} From 54e2e8c5e4e7c5a65e9b8f1d90142902ed95160e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 1 Aug 2014 14:40:37 -0700 Subject: [PATCH 3/3] Added extra line. --- python/pyspark/streaming/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/utils.py index 84f1dadeba03d..c60ecd1ed607a 100644 --- a/python/pyspark/streaming/utils.py +++ b/python/pyspark/streaming/utils.py @@ -17,6 +17,7 @@ from pyspark.rdd import RDD + class RDDFunction(): def __init__(self, ctx, jrdd_deserializer, func): self.ctx = ctx