Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic #28661

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions python/pyspark/sql/tests/test_pandas_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.sql.utils import ParseException
from pyspark.sql.utils import ParseException, PythonException
from pyspark.rdd import PythonEvalType
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
pandas_requirement_message, pyarrow_requirement_message
from pyspark.testing.utils import QuietTest

from py4j.protocol import Py4JJavaError


@unittest.skipIf(
not have_pandas or not have_pyarrow,
Expand Down Expand Up @@ -157,14 +155,14 @@ def foofoo(x, y):

# plain udf (test for SPARK-23754)
self.assertRaisesRegexp(
Py4JJavaError,
PythonException,
exc_message,
df.withColumn('v', udf(foo)('id')).collect
)

# pandas scalar udf
self.assertRaisesRegexp(
Py4JJavaError,
PythonException,
exc_message,
df.withColumn(
'v', pandas_udf(foo, 'double', PandasUDFType.SCALAR)('id')
Expand All @@ -173,15 +171,15 @@ def foofoo(x, y):

# pandas grouped map
self.assertRaisesRegexp(
Py4JJavaError,
PythonException,
exc_message,
df.groupBy('id').apply(
pandas_udf(foo, df.schema, PandasUDFType.GROUPED_MAP)
).collect
)

self.assertRaisesRegexp(
Py4JJavaError,
PythonException,
exc_message,
df.groupBy('id').apply(
pandas_udf(foofoo, df.schema, PandasUDFType.GROUPED_MAP)
Expand All @@ -190,7 +188,7 @@ def foofoo(x, y):

# pandas grouped agg
self.assertRaisesRegexp(
Py4JJavaError,
PythonException,
exc_message,
df.groupBy('id').agg(
pandas_udf(foo, 'double', PandasUDFType.GROUPED_AGG)('id')
Expand Down
53 changes: 44 additions & 9 deletions python/pyspark/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,19 @@
import py4j
import sys

from pyspark import SparkContext

if sys.version_info.major >= 3:
unicode = str
# Disable exception chaining (PEP 3134) in captured exceptions
# in order to hide JVM stacktace.
exec("""
def raise_from(e):
raise e from None
""")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way, actually I mimicked six

else:
def raise_from(e):
raise e

Comment on lines +32 to 34
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So seems PEP 3134 is only for 3.0+, we don't cut exception chaining in Python 2.7 with this raise_from?

Copy link
Member Author

@HyukjinKwon HyukjinKwon May 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #28661 (comment) too.

Yeah. In Python 2, there is no chaining. This is kind of a new feature in Python 3.

e.g.) in the current master:
Python 2:

>>> sql("a")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 646, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 102, in deco
    raise converted
pyspark.sql.utils.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
a
^^^

Python 3:

>>> sql("a")
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/utils.py", line 98, in deco
    return f(*a, **kw)
  File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o25.sql.
: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
a
^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:81)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:604)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:604)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 646, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 102, in deco
    raise converted
pyspark.sql.utils.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
a
^^^


class CapturedException(Exception):
Expand All @@ -29,7 +40,11 @@ def __init__(self, desc, stackTrace, cause=None):
self.cause = convert_exception(cause) if cause is not None else None

def __str__(self):
sql_conf = SparkContext._jvm.org.apache.spark.sql.internal.SQLConf.get()
debug_enabled = sql_conf.pysparkJVMStacktraceEnabled()
desc = self.desc
if debug_enabled:
desc = desc + "\nJVM stacktrace:\n%s" % self.stackTrace
# encode unicode instance for python2 for human readable description
if sys.version_info.major < 3 and isinstance(desc, unicode):
return str(desc.encode('utf-8'))
Expand Down Expand Up @@ -67,6 +82,12 @@ class QueryExecutionException(CapturedException):
"""


class PythonException(CapturedException):
"""
Exceptions thrown from Python workers.
"""


class UnknownException(CapturedException):
"""
None of the above exceptions.
Expand All @@ -75,21 +96,33 @@ class UnknownException(CapturedException):

def convert_exception(e):
s = e.toString()
stackTrace = '\n\t at '.join(map(lambda x: x.toString(), e.getStackTrace()))
c = e.getCause()

jvm = SparkContext._jvm
jwriter = jvm.java.io.StringWriter()
e.printStackTrace(jvm.java.io.PrintWriter(jwriter))
stacktrace = jwriter.toString()
Comment on lines +101 to +104
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this part, why need the change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous stacktrace wasn't actually quite correct. It hid the stacktrace from executor side before. Now, this PR handles an exception from executor so I needed to change this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, isn't getStackTrace content equivalent to printStackTrace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems different. This is what I get from getStackTrace:

org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117)
	 at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066)
	 at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065)
	 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	 at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	 at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065)
	 at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021)
	 at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021)
	 at scala.Option.foreach(Option.scala:407)
	 at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021)
	 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297)
	 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246)
	 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235)
	 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	 at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823)
	 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108)
	 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129)
	 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148)
	 at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	 at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	 at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	 at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653)
	 at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
	 at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
	 at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	 at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	 at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	 at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
	 at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
	 at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
	 at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	 at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	 at java.lang.reflect.Method.invoke(Method.java:498)
	 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	 at py4j.Gateway.invoke(Gateway.java:282)
	 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	 at py4j.commands.CallCommand.execute(CallCommand.java:79)
	 at py4j.GatewayConnection.run(GatewayConnection.java:238)
	 at java.lang.Thread.run(Thread.java:748)

this is what I get from printStackTrace

org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 2.0 failed 4 times, most recent failure: Lost task 10.3 in stage 2.0 (TID 18, 192.168.35.193, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like getStackTrace doesn't show the cause whereas printStackTrace shows it too. It's best to make it same as shown in the JVM anyway :-).

if s.startswith('org.apache.spark.sql.AnalysisException: '):
return AnalysisException(s.split(': ', 1)[1], stackTrace, c)
return AnalysisException(s.split(': ', 1)[1], stacktrace, c)
if s.startswith('org.apache.spark.sql.catalyst.analysis'):
return AnalysisException(s.split(': ', 1)[1], stackTrace, c)
return AnalysisException(s.split(': ', 1)[1], stacktrace, c)
if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
return ParseException(s.split(': ', 1)[1], stackTrace, c)
return ParseException(s.split(': ', 1)[1], stacktrace, c)
Comment on lines 109 to +110
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some exceptions, like ParseException, it should be happened in JVM, right? Is it good to show JVM stacktrace by default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ParseException at least shows a meaningful error message to the end user such as:

: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
a
^^^

If developers want to debug, they can enable spark.sql.pyspark.jvmStacktrace.enabled.

if s.startswith('org.apache.spark.sql.streaming.StreamingQueryException: '):
return StreamingQueryException(s.split(': ', 1)[1], stackTrace, c)
return StreamingQueryException(s.split(': ', 1)[1], stacktrace, c)
if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '):
return QueryExecutionException(s.split(': ', 1)[1], stackTrace, c)
return QueryExecutionException(s.split(': ', 1)[1], stacktrace, c)
if s.startswith('java.lang.IllegalArgumentException: '):
return IllegalArgumentException(s.split(': ', 1)[1], stackTrace, c)
return UnknownException(s, stackTrace, c)
return IllegalArgumentException(s.split(': ', 1)[1], stacktrace, c)
if c is not None and (
c.toString().startswith('org.apache.spark.api.python.PythonException: ')
# To make sure this only catches Python UDFs.
and any(map(lambda v: "org.apache.spark.sql.execution.python" in v.toString(),
c.getStackTrace()))):
msg = ("\n An exception was thrown from Python worker in the executor. "
"The below is the Python worker stacktrace.\n%s" % c.getMessage())
return PythonException(msg, stacktrace)
return UnknownException(s, stacktrace, c)


def capture_sql_exception(f):
Expand All @@ -99,7 +132,9 @@ def deco(*a, **kw):
except py4j.protocol.Py4JJavaError as e:
converted = convert_exception(e.java_exception)
if not isinstance(converted, UnknownException):
raise converted
# Hide where the exception came from that shows a non-Pythonic
# JVM exception message.
raise_from(converted)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So raise_from is used to cut the exception chain from JVM?

else:
raise
return deco
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1784,6 +1784,15 @@ object SQLConf {
.version("3.0.0")
.fallbackConf(ARROW_EXECUTION_ENABLED)

val PYSPARK_JVM_STACKTRACE_ENABLED =
buildConf("spark.sql.pyspark.jvmStacktrace.enabled")
.doc("When true, it shows the JVM stacktrace in the user-facing PySpark exception " +
"together with Python stacktrace. By default, it is disabled and hides JVM stacktrace " +
"and shows a Python-friendly exception only.")
.version("3.0.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this targeting 3.0.0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be arguable .. but it virtually changes the exception message only at the core. I personally think it's okay/good to have it in 3.0. But I am okay to retarget if there's any concern about it.

Copy link
Member

@dongjoon-hyun dongjoon-hyun May 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile . Are you okay with 3.0.0 targeting here?
Although we are in RC stage, this PR looks worth for backporting. (Also the default is false.)

One question for @HyukjinKwon . Do we want this as a dynamic configuration instead of static configuration? I mean do we want to switch on/off during runtime?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't strictly have to be in Spark 3.0. I just wanted to have some feedback quicker from users, and thought it's good to try this in Spark 3.0 as technically we just touch the error messages only here.

I don't super strongly feel it has to land to Spark 3.0 - it's okay to retarget 3.1 if anyone feels strongly it has to be only in the master. Let me know :-)

.booleanConf
.createWithDefault(false)

val ARROW_SPARKR_EXECUTION_ENABLED =
buildConf("spark.sql.execution.arrow.sparkr.enabled")
.doc("When true, make use of Apache Arrow for columnar data transfers in SparkR. " +
Expand Down Expand Up @@ -3063,6 +3072,8 @@ class SQLConf extends Serializable with Logging {

def arrowPySparkEnabled: Boolean = getConf(ARROW_PYSPARK_EXECUTION_ENABLED)

def pysparkJVMStacktraceEnabled: Boolean = getConf(PYSPARK_JVM_STACKTRACE_ENABLED)

def arrowSparkREnabled: Boolean = getConf(ARROW_SPARKR_EXECUTION_ENABLED)

def arrowPySparkFallbackEnabled: Boolean = getConf(ARROW_PYSPARK_FALLBACK_ENABLED)
Expand Down