Skip to content

Commit

Permalink
[SPARK-972] Added detailed callsite info for ValueError in context.py…
Browse files Browse the repository at this point in the history
… (resubmitted)

Author: jyotiska <jyotiska123@gmail.com>

Closes #34 from jyotiska/pyspark_code and squashes the following commits:

c9439be [jyotiska] replaced dict with namedtuple
a6bf4cd [jyotiska] added callsite info for context.py
  • Loading branch information
jyotiska authored and mateiz committed Mar 10, 2014
1 parent e1e09e0 commit f551898
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
16 changes: 15 additions & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
from threading import Lock
from tempfile import NamedTemporaryFile
from collections import namedtuple

from pyspark import accumulators
from pyspark.accumulators import Accumulator
Expand All @@ -29,6 +30,7 @@
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD

from py4j.java_collections import ListConverter
Expand Down Expand Up @@ -83,6 +85,11 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
...
ValueError:...
"""
if rdd._extract_concise_traceback() is not None:
self._callsite = rdd._extract_concise_traceback()
else:
tempNamedTuple = namedtuple("Callsite", "function file linenum")
self._callsite = tempNamedTuple(function=None, file=None, linenum=None)
SparkContext._ensure_initialized(self, gateway=gateway)

self.environment = environment or {}
Expand Down Expand Up @@ -169,7 +176,14 @@ def _ensure_initialized(cls, instance=None, gateway=None):

if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
raise ValueError("Cannot run multiple SparkContexts at once")
currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite

# Raise error if there is already a running Spark context
raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \
" created by %s at %s:%s " \
% (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance

Expand Down
21 changes: 14 additions & 7 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from base64 import standard_b64encode as b64enc
import copy
from collections import defaultdict
from collections import namedtuple
from itertools import chain, ifilter, imap
import operator
import os
Expand All @@ -42,12 +43,14 @@
__all__ = ["RDD"]

def _extract_concise_traceback():
"""
This function returns the traceback info for a callsite, returns a dict
with function name, file name and line number
"""
tb = traceback.extract_stack()
callsite = namedtuple("Callsite", "function file linenum")
if len(tb) == 0:
return "I'm lost!"
# HACK: This function is in a file called 'rdd.py' in the top level of
# everything PySpark. Just trim off the directory name and assume
# everything in that tree is PySpark guts.
return None
file, line, module, what = tb[len(tb) - 1]
sparkpath = os.path.dirname(file)
first_spark_frame = len(tb) - 1
Expand All @@ -58,16 +61,20 @@ def _extract_concise_traceback():
break
if first_spark_frame == 0:
file, line, fun, what = tb[0]
return "%s at %s:%d" % (fun, file, line)
return callsite(function=fun, file=file, linenum=line)
sfile, sline, sfun, swhat = tb[first_spark_frame]
ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
return "%s at %s:%d" % (sfun, ufile, uline)
return callsite(function=sfun, file=ufile, linenum=uline)

_spark_stack_depth = 0

class _JavaStackTrace(object):
def __init__(self, sc):
self._traceback = _extract_concise_traceback()
tb = _extract_concise_traceback()
if tb is not None:
self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum)
else:
self._traceback = "Error! Could not extract traceback info"
self._context = sc

def __enter__(self):
Expand Down

0 comments on commit f551898

Please sign in to comment.