Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3491] [MLlib] [PySpark] use pickle to serialize data in MLlib #2378

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
60e4e2f
support unpickle array.array for Python 2.6
davies Sep 11, 2014
c77c87b
cleanup debugging code
davies Sep 13, 2014
3908f5c
Merge branch 'master' into pickle
davies Sep 13, 2014
f44f771
enable tests about array
davies Sep 13, 2014
b30ef35
use pickle to serialize data for mllib/recommendation
davies Sep 13, 2014
52d1350
use new protocol in mllib/stat
davies Sep 13, 2014
f1544c4
refactor clustering
davies Sep 13, 2014
aa2287e
random
davies Sep 13, 2014
8fe166a
Merge branch 'pickle' into pickle_mllib
davies Sep 13, 2014
cccb8b1
mllib/tree
davies Sep 14, 2014
d9f691f
mllib/util
davies Sep 14, 2014
f2a0856
mllib/regression
davies Sep 14, 2014
c383544
classification
davies Sep 14, 2014
6d26b03
fix tests
davies Sep 14, 2014
4d7963e
remove muanlly serialization
davies Sep 14, 2014
84c721d
Merge branch 'master' into pickle_mllib
davies Sep 14, 2014
b02e34f
remove _common.py
davies Sep 14, 2014
0ee1525
remove outdated tests
davies Sep 14, 2014
722dd96
cleanup _common.py
davies Sep 15, 2014
f3506c5
Merge branch 'master' into pickle_mllib
davies Sep 16, 2014
df19464
memorize the module and class name during pickleing
davies Sep 16, 2014
46a501e
choose batch size automatically
davies Sep 16, 2014
88034f0
rafactor, address comments
davies Sep 16, 2014
9dcfb63
fix style
davies Sep 16, 2014
708dc02
fix tests
davies Sep 16, 2014
e1d1bfc
refactor
davies Sep 16, 2014
44736d7
speed up pickling array in Python 2.7
davies Sep 16, 2014
154d141
fix autobatchedpickler
davies Sep 16, 2014
df625c7
Merge commit '154d141' into pickle_mllib
davies Sep 16, 2014
a379a81
fix pickle array in python2.7
davies Sep 16, 2014
44e0551
fix cache
davies Sep 16, 2014
9ceff73
test size of serialized Rating
davies Sep 16, 2014
a2cc855
fix tests
davies Sep 16, 2014
1fccf1a
address comments
davies Sep 16, 2014
2511e76
cleanup
davies Sep 16, 2014
19d0967
refactor Picklers
davies Sep 17, 2014
e431377
fix cache of rdd, refactor
davies Sep 17, 2014
bd738ab
address comments
davies Sep 18, 2014
032cd62
add more type check and conversion for user_product
davies Sep 18, 2014
810f97f
fix equal of matrix
davies Sep 18, 2014
dffbba2
Merge branch 'master' of github.com:apache/spark into pickle_mllib
davies Sep 19, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -775,17 +775,36 @@ private[spark] object PythonRDD extends Logging {
}.toJavaRDD()
}

private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
private val pickle = new Pickler()
private var batch = 1
private val buffer = new mutable.ArrayBuffer[Any]

override def hasNext(): Boolean = iter.hasNext

override def next(): Array[Byte] = {
while (iter.hasNext && buffer.length < batch) {
buffer += iter.next()
}
val bytes = pickle.dumps(buffer.toArray)
val size = bytes.length
// let 1M < size < 10M
if (size < 1024 * 1024) {
batch *= 2
} else if (size > 1024 * 1024 * 10 && batch > 1) {
batch /= 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the first record is very large, batch will be 0.

}
buffer.clear()
bytes
}
}

/**
* Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
* PySpark.
*/
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
jRDD.rdd.mapPartitions { iter =>
val pickle = new Pickler
iter.map { row =>
pickle.dumps(row)
}
}
jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ private[python] object SerDeUtil extends Logging {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
val data: String = args(1).asInstanceOf[String]
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
val data: Array[Byte] = args(1).asInstanceOf[String].getBytes("ISO-8859-1")
construct(typecode, machineCodes(typecode), data)
} else {
super.construct(args)
}
Expand Down
Loading