Skip to content

Commit

Permalink
Add Python includes to path before depickling broadcast values
Browse files Browse the repository at this point in the history
This fixes https://issues.apache.org/jira/browse/SPARK-1731 by adding the Python includes to the PYTHONPATH before depickling the broadcast values

@airhorns

Author: Bouke van der Bijl <boukevanderbijl@gmail.com>

Closes #656 from bouk/python-includes-before-broadcast and squashes the following commits:

7b0dfe4 [Bouke van der Bijl] Add Python includes to path before depickling broadcast values
(cherry picked from commit 3776f2f)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
  • Loading branch information
bouk authored and pwendell committed May 10, 2014
1 parent 71ad53f commit 2a669a7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,18 @@ private[spark] class PythonRDD[T: ClassTag](
dataOut.writeInt(split.index)
// sparkFilesDir
PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
// Python includes (*.zip and *.egg files)
dataOut.writeInt(pythonIncludes.length)
for (include <- pythonIncludes) {
PythonRDD.writeUTF(include, dataOut)
}
// Broadcast variables
dataOut.writeInt(broadcastVars.length)
for (broadcast <- broadcastVars) {
dataOut.writeLong(broadcast.id)
dataOut.writeInt(broadcast.value.length)
dataOut.write(broadcast.value)
}
// Python includes (*.zip and *.egg files)
dataOut.writeInt(pythonIncludes.length)
for (include <- pythonIncludes) {
PythonRDD.writeUTF(include, dataOut)
}
dataOut.flush()
// Serialized command:
dataOut.writeInt(command.length)
Expand Down
14 changes: 7 additions & 7 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ def main(infile, outfile):
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True

# fetch names and values of broadcast variables
num_broadcast_variables = read_int(infile)
for _ in range(num_broadcast_variables):
bid = read_long(infile)
value = pickleSer._read_with_length(infile)
_broadcastRegistry[bid] = Broadcast(bid, value)

# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
num_python_includes = read_int(infile)
for _ in range(num_python_includes):
filename = utf8_deserializer.loads(infile)
sys.path.append(os.path.join(spark_files_dir, filename))

# fetch names and values of broadcast variables
num_broadcast_variables = read_int(infile)
for _ in range(num_broadcast_variables):
bid = read_long(infile)
value = pickleSer._read_with_length(infile)
_broadcastRegistry[bid] = Broadcast(bid, value)

command = pickleSer._read_with_length(infile)
(func, deserializer, serializer) = command
init_time = time.time()
Expand Down

0 comments on commit 2a669a7

Please sign in to comment.