Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48752][PYTHON][CONNECT][DOCS] Introduce pyspark.logger for improved structured logging for PySpark #47145

Closed
wants to merge 22 commits into from

Conversation

itholic
Copy link
Contributor

@itholic itholic commented Jun 28, 2024

What changes were proposed in this pull request?

This PR introduces the pyspark.logger module to facilitate structured client-side logging for PySpark users.

This module includes a PySparkLogger class that provides several methods for logging messages at different levels in a structured JSON format:

  • PySparkLogger.info
  • PySparkLogger.warning
  • PySparkLogger.error

The logger can be easily configured to write logs to either the console or a specified file.

DataFrame error log improvement

This PR also improves the DataFrame API error logs by leveraging this new logging framework:

Before

We introduced structured logging from #45729, but PySpark log is still hard to figure out in the current structured log, because it is hidden and mixed within bunch of complex JVM stacktraces and it's also not very Python-friendly:

{
  "ts": "2024-06-28T10:53:48.528Z",
  "level": "ERROR",
  "msg": "Exception in task 7.0 in stage 0.0 (TID 7)",
  "context": {
    "task_name": "task 7.0 in stage 0.0 (TID 7)"
  },
  "exception": {
    "class": "org.apache.spark.SparkArithmeticException",
    "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
    "stacktrace": [
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors$",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": 203
      },
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "project_doConsume_0$",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "processNext",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.execution.BufferedRowIterator",
        "method": "hasNext",
        "file": "BufferedRowIterator.java",
        "line": 43
      },
      {
        "class": "org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1",
        "method": "hasNext",
        "file": "WholeStageCodegenEvaluatorFactory.scala",
        "line": 50
      },
      {
        "class": "org.apache.spark.sql.execution.SparkPlan",
        "method": "$anonfun$getByteArrayRdd$1",
        "file": "SparkPlan.scala",
        "line": 388
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2$adapted",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.MapPartitionsRDD",
        "method": "compute",
        "file": "MapPartitionsRDD.scala",
        "line": 52
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "computeOrReadCheckpoint",
        "file": "RDD.scala",
        "line": 369
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "iterator",
        "file": "RDD.scala",
        "line": 333
      },
      {
        "class": "org.apache.spark.scheduler.ResultTask",
        "method": "runTask",
        "file": "ResultTask.scala",
        "line": 93
      },
      {
        "class": "org.apache.spark.TaskContext",
        "method": "runTaskWithListeners",
        "file": "TaskContext.scala",
        "line": 171
      },
      {
        "class": "org.apache.spark.scheduler.Task",
        "method": "run",
        "file": "Task.scala",
        "line": 146
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "$anonfun$run$5",
        "file": "Executor.scala",
        "line": 644
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally",
        "file": "SparkErrorUtils.scala",
        "line": 64
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally$",
        "file": "SparkErrorUtils.scala",
        "line": 61
      },
      {
        "class": "org.apache.spark.util.Utils$",
        "method": "tryWithSafeFinally",
        "file": "Utils.scala",
        "line": 99
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "run",
        "file": "Executor.scala",
        "line": 647
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor",
        "method": "runWorker",
        "file": "ThreadPoolExecutor.java",
        "line": 1136
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "method": "run",
        "file": "ThreadPoolExecutor.java",
        "line": 635
      },
      {
        "class": "java.lang.Thread",
        "method": "run",
        "file": "Thread.java",
        "line": 840
      }
    ]
  },
  "logger": "Executor"
}

After

Now we can get a improved, simplified and also Python-friendly error log for DataFrame errors:

{
  "ts": "2024-06-28 19:53:48,563",
  "level": "ERROR",
  "logger": "DataFrameQueryContextLogger",
  "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n", 
  "context": {
    "file": "/.../spark/python/test_error_context.py",
    "line_no": "17",
    "fragment": "__truediv__"
    "error_class": "DIVIDE_BY_ZERO"
  },
  "exception": {
    "class": "Py4JJavaError",
    "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22\n\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)\n\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)\n\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)\n\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)\n\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)\n\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)\n\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n",
    "stacktrace": ["Traceback (most recent call last):", "  File \"/Users/haejoon.lee/Desktop/git_repos/spark/python/pyspark/errors/exceptions/captured.py\", line 272, in deco", "    return f(*a, **kw)", "  File \"/Users/haejoon.lee/anaconda3/envs/pyspark-dev-env/lib/python3.9/site-packages/py4j/protocol.py\", line 326, in get_return_value", "    raise Py4JJavaError(", "py4j.protocol.Py4JJavaError: An error occurred while calling o52.showString.", ": org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012", "== DataFrame ==", "\"__truediv__\" was called from", "/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22", "", "\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)", "\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)", "\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)", "\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)", "\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)", "\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)", "\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)", "\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)", "\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)", "\tat org.apache.spark.scheduler.Task.run(Task.scala:146)", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)", "\tat java.base/java.lang.Thread.run(Thread.java:840)", "\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)", "\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)", "\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)", "\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)", "\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)", "\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)", "\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)", "\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)", "\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)", "\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)", "\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)", "\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)", "\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)", "\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)", "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)", "\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)", "\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)", "\tat py4j.Gateway.invoke(Gateway.java:282)", "\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)", "\tat py4j.commands.CallCommand.execute(CallCommand.java:79)", "\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)", "\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)", "\tat java.base/java.lang.Thread.run(Thread.java:840)"]
  },
}

Why are the changes needed?

Before

Currently we don't have PySpark dedicated logging module so we have to manually set up and customize the Python logging module, for example:

logger = logging.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}")

This logs an information just in a following simple string:

INFO:TestLogger:User test_user takes an test_action

This is not very actionable, and it is hard to analyze not since it is not well-structured.

Or we can use Log4j from JVM which resulting in excessively detailed logs as described in the above example, and this way even cannot be applied to Spark Connect.

After

We can simply import and use PySparkLogger with minimal setup:

from pyspark.logger import PySparkLogger
logger = PySparkLogger.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}", user=user, action=action)

This logs an information in a following JSON format:

{
  "ts": "2024-06-28 19:44:19,030",
  "level": "WARNING",
  "logger": "TestLogger",
  "msg": "User test_user takes an test_action",
  "context": {
    "user": "test_user",
    "action": "test_action"
  },
}

NOTE: we can add as many keyword arguments as we want for each logging methods. These keyword arguments, such as user and action in the example, are included within the "context" field of the JSON log. This structure makes it easy to track and analyze the log.

Does this PR introduce any user-facing change?

No API changes, but the PySpark client-side logging is improved.

Also added user-facing documentation "Logging in PySpark":

Screenshot 2024-07-16 at 5 40 41 PM

Also added API reference:

Screenshot 2024-07-16 at 5 40 58 PM

How was this patch tested?

Added UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

@itholic itholic changed the title [WIP][SPARK-48752][PYTHON] Introduce pyspark.logging for improved structured logging for PySpark [WIP][SPARK-48752][PYTHON][CONNECT] Introduce pyspark.logging for improved structured logging for PySpark Jun 28, 2024
@itholic
Copy link
Contributor Author

itholic commented Jul 1, 2024

Hi! @HyukjinKwon @gengliangwang @ueshin
This is an initial prototype proposal for supporting dedicated PySpark logging module in structured logging format.
I'm still working on some improvement such as documentation and CI, but the basic idea is organized in the PR description so it would be appreciated if you could review it when you find some time :-)

@itholic itholic changed the title [WIP][SPARK-48752][PYTHON][CONNECT] Introduce pyspark.logging for improved structured logging for PySpark [WIP][SPARK-48752][PYTHON][CONNECT] Introduce pyspark.logger for improved structured logging for PySpark Jul 2, 2024
@itholic itholic changed the title [WIP][SPARK-48752][PYTHON][CONNECT] Introduce pyspark.logger for improved structured logging for PySpark [WIP][SPARK-48752][PYTHON][CONNECT][DOCS] Introduce pyspark.logger for improved structured logging for PySpark Jul 2, 2024
@gengliangwang
Copy link
Member

@itholic Thanks for working on this. LGTM overall.
I notice that there are some difference from the scheme of the scala structured streaming:https://github.com/apache/spark/blob/master/common/utils/src/main/resources/org/apache/spark/SparkLayout.json#L2

  • ts => timestamp
  • msg => message
  • logger => name

Since the JSON Template Layout in Log4j 2.x (https://logging.apache.org/log4j/2.x/manual/json-template-layout.html) does not support multi-line output and Spark generates a significant number of INFO level logs, I opted to use the short names ts and msg for readability. Should we consider making the Python side consistent with this naming convention? Alternatively, if you think it’s more appropriate, we could modify the Scala side instead.

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we hookup the spark connect logger here has well?

python/pyspark/logger/logger.py Outdated Show resolved Hide resolved
@itholic itholic changed the title [WIP][SPARK-48752][PYTHON][CONNECT][DOCS] Introduce pyspark.logger for improved structured logging for PySpark [SPARK-48752][PYTHON][CONNECT][DOCS] Introduce pyspark.logger for improved structured logging for PySpark Jul 8, 2024
@itholic itholic marked this pull request as ready for review July 8, 2024 00:57
@itholic
Copy link
Contributor Author

itholic commented Jul 8, 2024

Should we consider making the Python side consistent with this naming convention?

Yeah, the suggested short names looks reasonable to me. Let me address them. Thanks!

@itholic
Copy link
Contributor Author

itholic commented Jul 15, 2024

The PySpark logger should effectively behave like a standard Python logger.

Agreed with this suggestion so I updated the behavior and related documents accordingly.

Now, the final question is, shouldn't we add an instance of it to the SparkSession? If yes, is it configured properly to emit module names and functions?

Possibly we can integrate it with SparkSession but I believe we need further design discussion maybe in separate tickets as PySparkLogger targets only for Python client.

My suggestion is that maybe we will focus on supporting the initial version here for the Python client users first, focusing on API structure and documentation, and then support advanced features through couple of follow-ups??

@grundprinzip
Copy link
Contributor

Now, the final question is, shouldn't we add an instance of it to the SparkSession? If yes, is it configured properly to emit module names and functions?

Possibly we can integrate it with SparkSession but I believe we need further design discussion maybe in separate tickets as PySparkLogger targets only for Python client.

My suggestion is that maybe we will focus on supporting the initial version here for the Python client users first, focusing on API structure and documentation, and then support advanced features through couple of follow-ups??

Actually, I meant only for the client logging, simply to provide ease of use. But agree, we can do this as a follow up.

@itholic
Copy link
Contributor Author

itholic commented Jul 15, 2024

Hi, @gengliangwang I applied your initial suggestions so could you take a look again when you find some time??

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work!

self.assertTrue(err_msg in log_json["msg"])
self.assertTrue(err_msg in log_json["exception"]["msg"])
self.assertEqual(log_json["context"]["fragment"], "__truediv__")
self.assertEqual(log_json["context"]["error_class"], "DIVIDE_BY_ZERO")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think message_parameters and error_class has to be camel cased. Seems like it's preexisting issue so you won't have to deal with it here but I think we should probably fix them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let me fix them within a separate ticket after completing this PR

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine otherwise

@itholic
Copy link
Contributor Author

itholic commented Jul 18, 2024

Thanks @HyukjinKwon just applied the comments

@HyukjinKwon
Copy link
Member

Merged to master.

jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
…mproved structured logging for PySpark

### What changes were proposed in this pull request?

This PR introduces the `pyspark.logger` module to facilitate structured client-side logging for PySpark users.

This module includes a `PySparkLogger` class that provides several methods for logging messages at different levels in a structured JSON format:
- `PySparkLogger.info`
- `PySparkLogger.warning`
- `PySparkLogger.error`

The logger can be easily configured to write logs to either the console or a specified file.

## DataFrame error log improvement

This PR also improves the DataFrame API error logs by leveraging this new logging framework:

### **Before**

We introduced structured logging from apache#45729, but PySpark log is still hard to figure out in the current structured log, because it is hidden and mixed within bunch of complex JVM stacktraces and it's also not very Python-friendly:

```json
{
  "ts": "2024-06-28T10:53:48.528Z",
  "level": "ERROR",
  "msg": "Exception in task 7.0 in stage 0.0 (TID 7)",
  "context": {
    "task_name": "task 7.0 in stage 0.0 (TID 7)"
  },
  "exception": {
    "class": "org.apache.spark.SparkArithmeticException",
    "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
    "stacktrace": [
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors$",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": 203
      },
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "project_doConsume_0$",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "processNext",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.execution.BufferedRowIterator",
        "method": "hasNext",
        "file": "BufferedRowIterator.java",
        "line": 43
      },
      {
        "class": "org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1",
        "method": "hasNext",
        "file": "WholeStageCodegenEvaluatorFactory.scala",
        "line": 50
      },
      {
        "class": "org.apache.spark.sql.execution.SparkPlan",
        "method": "$anonfun$getByteArrayRdd$1",
        "file": "SparkPlan.scala",
        "line": 388
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2$adapted",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.MapPartitionsRDD",
        "method": "compute",
        "file": "MapPartitionsRDD.scala",
        "line": 52
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "computeOrReadCheckpoint",
        "file": "RDD.scala",
        "line": 369
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "iterator",
        "file": "RDD.scala",
        "line": 333
      },
      {
        "class": "org.apache.spark.scheduler.ResultTask",
        "method": "runTask",
        "file": "ResultTask.scala",
        "line": 93
      },
      {
        "class": "org.apache.spark.TaskContext",
        "method": "runTaskWithListeners",
        "file": "TaskContext.scala",
        "line": 171
      },
      {
        "class": "org.apache.spark.scheduler.Task",
        "method": "run",
        "file": "Task.scala",
        "line": 146
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "$anonfun$run$5",
        "file": "Executor.scala",
        "line": 644
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally",
        "file": "SparkErrorUtils.scala",
        "line": 64
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally$",
        "file": "SparkErrorUtils.scala",
        "line": 61
      },
      {
        "class": "org.apache.spark.util.Utils$",
        "method": "tryWithSafeFinally",
        "file": "Utils.scala",
        "line": 99
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "run",
        "file": "Executor.scala",
        "line": 647
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor",
        "method": "runWorker",
        "file": "ThreadPoolExecutor.java",
        "line": 1136
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "method": "run",
        "file": "ThreadPoolExecutor.java",
        "line": 635
      },
      {
        "class": "java.lang.Thread",
        "method": "run",
        "file": "Thread.java",
        "line": 840
      }
    ]
  },
  "logger": "Executor"
}

```

### **After**

Now we can get a improved, simplified and also Python-friendly error log for DataFrame errors:

```json
{
  "ts": "2024-06-28 19:53:48,563",
  "level": "ERROR",
  "logger": "DataFrameQueryContextLogger",
  "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
  "context": {
    "file": "/.../spark/python/test_error_context.py",
    "line_no": "17",
    "fragment": "__truediv__"
    "error_class": "DIVIDE_BY_ZERO"
  },
  "exception": {
    "class": "Py4JJavaError",
    "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22\n\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)\n\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)\n\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)\n\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)\n\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)\n\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)\n\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n",
    "stacktrace": ["Traceback (most recent call last):", "  File \"/Users/haejoon.lee/Desktop/git_repos/spark/python/pyspark/errors/exceptions/captured.py\", line 272, in deco", "    return f(*a, **kw)", "  File \"/Users/haejoon.lee/anaconda3/envs/pyspark-dev-env/lib/python3.9/site-packages/py4j/protocol.py\", line 326, in get_return_value", "    raise Py4JJavaError(", "py4j.protocol.Py4JJavaError: An error occurred while calling o52.showString.", ": org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012", "== DataFrame ==", "\"__truediv__\" was called from", "/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22", "", "\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)", "\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)", "\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)", "\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)", "\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)", "\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)", "\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)", "\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)", "\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)", "\tat org.apache.spark.scheduler.Task.run(Task.scala:146)", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)", "\tat java.base/java.lang.Thread.run(Thread.java:840)", "\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)", "\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)", "\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)", "\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)", "\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)", "\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)", "\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)", "\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)", "\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)", "\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)", "\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)", "\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)", "\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)", "\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)", "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)", "\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)", "\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)", "\tat py4j.Gateway.invoke(Gateway.java:282)", "\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)", "\tat py4j.commands.CallCommand.execute(CallCommand.java:79)", "\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)", "\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)", "\tat java.base/java.lang.Thread.run(Thread.java:840)"]
  },
}
```

### Why are the changes needed?

**Before**

Currently we don't have PySpark dedicated logging module so we have to manually set up and customize the Python logging module, for example:

```python
logger = logging.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}")
```

This logs an information just in a following simple string:

```
INFO:TestLogger:User test_user takes an test_action
```

This is not very actionable, and it is hard to analyze not since it is not well-structured.

Or we can use Log4j from JVM which resulting in excessively detailed logs as described in the above example, and this way even cannot be applied to Spark Connect.

**After**

We can simply import and use `PySparkLogger` with minimal setup:

```python
from pyspark.logger import PySparkLogger
logger = PySparkLogger.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}", user=user, action=action)
```

This logs an information in a following JSON format:

```json
{
  "ts": "2024-06-28 19:44:19,030",
  "level": "WARNING",
  "logger": "TestLogger",
  "msg": "User test_user takes an test_action",
  "context": {
    "user": "test_user",
    "action": "test_action"
  },
}
```

**NOTE:** we can add as many keyword arguments as we want for each logging methods. These keyword arguments, such as `user` and `action` in the example, are included within the `"context"` field of the JSON log. This structure makes it easy to track and analyze the log.

### Does this PR introduce _any_ user-facing change?

No API changes, but the PySpark client-side logging is improved.

Also added user-facing documentation "Logging in PySpark":

<img width="1395" alt="Screenshot 2024-07-16 at 5 40 41 PM" src="https://github.com/user-attachments/assets/c77236aa-1c6f-4b5b-ad14-26ccdc474f59">

Also added API reference:

<img width="1417" alt="Screenshot 2024-07-16 at 5 40 58 PM" src="https://github.com/user-attachments/assets/6bb3fb23-6847-4086-8f4b-bcf9f4242724">

### How was this patch tested?

Added UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47145 from itholic/pyspark_logger.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
itholic added a commit that referenced this pull request Jul 23, 2024
…er in log

### What changes were proposed in this pull request?

This PR followups for #47145 to rename the log field naming

### Why are the changes needed?

`line_no` is not very intuitive so we better renaming to `line_number` explicitly.

### Does this PR introduce _any_ user-facing change?

No API change, but user-facing log message will be improved

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
-->
The existing CI should pass

### Was this patch authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this patch, please include the
phrase: 'Generated-by: ' followed by the name of the tool and its version.
If no, write 'No'.
Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
-->
No

Closes #47437 from itholic/logger_followup.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Haejoon Lee <haejoon.lee@databricks.com>
ilicmarkodb pushed a commit to ilicmarkodb/spark that referenced this pull request Jul 29, 2024
…er in log

### What changes were proposed in this pull request?

This PR followups for apache#47145 to rename the log field naming

### Why are the changes needed?

`line_no` is not very intuitive so we better renaming to `line_number` explicitly.

### Does this PR introduce _any_ user-facing change?

No API change, but user-facing log message will be improved

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
-->
The existing CI should pass

### Was this patch authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this patch, please include the
phrase: 'Generated-by: ' followed by the name of the tool and its version.
If no, write 'No'.
Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
-->
No

Closes apache#47437 from itholic/logger_followup.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Haejoon Lee <haejoon.lee@databricks.com>
fusheng-rd pushed a commit to fusheng-rd/spark that referenced this pull request Aug 6, 2024
…er in log

### What changes were proposed in this pull request?

This PR followups for apache#47145 to rename the log field naming

### Why are the changes needed?

`line_no` is not very intuitive so we better renaming to `line_number` explicitly.

### Does this PR introduce _any_ user-facing change?

No API change, but user-facing log message will be improved

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
-->
The existing CI should pass

### Was this patch authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this patch, please include the
phrase: 'Generated-by: ' followed by the name of the tool and its version.
If no, write 'No'.
Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
-->
No

Closes apache#47437 from itholic/logger_followup.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Haejoon Lee <haejoon.lee@databricks.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…mproved structured logging for PySpark

### What changes were proposed in this pull request?

This PR introduces the `pyspark.logger` module to facilitate structured client-side logging for PySpark users.

This module includes a `PySparkLogger` class that provides several methods for logging messages at different levels in a structured JSON format:
- `PySparkLogger.info`
- `PySparkLogger.warning`
- `PySparkLogger.error`

The logger can be easily configured to write logs to either the console or a specified file.

## DataFrame error log improvement

This PR also improves the DataFrame API error logs by leveraging this new logging framework:

### **Before**

We introduced structured logging from apache#45729, but PySpark log is still hard to figure out in the current structured log, because it is hidden and mixed within bunch of complex JVM stacktraces and it's also not very Python-friendly:

```json
{
  "ts": "2024-06-28T10:53:48.528Z",
  "level": "ERROR",
  "msg": "Exception in task 7.0 in stage 0.0 (TID 7)",
  "context": {
    "task_name": "task 7.0 in stage 0.0 (TID 7)"
  },
  "exception": {
    "class": "org.apache.spark.SparkArithmeticException",
    "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
    "stacktrace": [
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors$",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": 203
      },
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "project_doConsume_0$",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "processNext",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.execution.BufferedRowIterator",
        "method": "hasNext",
        "file": "BufferedRowIterator.java",
        "line": 43
      },
      {
        "class": "org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1",
        "method": "hasNext",
        "file": "WholeStageCodegenEvaluatorFactory.scala",
        "line": 50
      },
      {
        "class": "org.apache.spark.sql.execution.SparkPlan",
        "method": "$anonfun$getByteArrayRdd$1",
        "file": "SparkPlan.scala",
        "line": 388
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2$adapted",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.MapPartitionsRDD",
        "method": "compute",
        "file": "MapPartitionsRDD.scala",
        "line": 52
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "computeOrReadCheckpoint",
        "file": "RDD.scala",
        "line": 369
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "iterator",
        "file": "RDD.scala",
        "line": 333
      },
      {
        "class": "org.apache.spark.scheduler.ResultTask",
        "method": "runTask",
        "file": "ResultTask.scala",
        "line": 93
      },
      {
        "class": "org.apache.spark.TaskContext",
        "method": "runTaskWithListeners",
        "file": "TaskContext.scala",
        "line": 171
      },
      {
        "class": "org.apache.spark.scheduler.Task",
        "method": "run",
        "file": "Task.scala",
        "line": 146
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "$anonfun$run$5",
        "file": "Executor.scala",
        "line": 644
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally",
        "file": "SparkErrorUtils.scala",
        "line": 64
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally$",
        "file": "SparkErrorUtils.scala",
        "line": 61
      },
      {
        "class": "org.apache.spark.util.Utils$",
        "method": "tryWithSafeFinally",
        "file": "Utils.scala",
        "line": 99
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "run",
        "file": "Executor.scala",
        "line": 647
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor",
        "method": "runWorker",
        "file": "ThreadPoolExecutor.java",
        "line": 1136
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "method": "run",
        "file": "ThreadPoolExecutor.java",
        "line": 635
      },
      {
        "class": "java.lang.Thread",
        "method": "run",
        "file": "Thread.java",
        "line": 840
      }
    ]
  },
  "logger": "Executor"
}

```

### **After**

Now we can get a improved, simplified and also Python-friendly error log for DataFrame errors:

```json
{
  "ts": "2024-06-28 19:53:48,563",
  "level": "ERROR",
  "logger": "DataFrameQueryContextLogger",
  "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
  "context": {
    "file": "/.../spark/python/test_error_context.py",
    "line_no": "17",
    "fragment": "__truediv__"
    "error_class": "DIVIDE_BY_ZERO"
  },
  "exception": {
    "class": "Py4JJavaError",
    "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22\n\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)\n\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)\n\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)\n\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)\n\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)\n\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)\n\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n",
    "stacktrace": ["Traceback (most recent call last):", "  File \"/Users/haejoon.lee/Desktop/git_repos/spark/python/pyspark/errors/exceptions/captured.py\", line 272, in deco", "    return f(*a, **kw)", "  File \"/Users/haejoon.lee/anaconda3/envs/pyspark-dev-env/lib/python3.9/site-packages/py4j/protocol.py\", line 326, in get_return_value", "    raise Py4JJavaError(", "py4j.protocol.Py4JJavaError: An error occurred while calling o52.showString.", ": org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012", "== DataFrame ==", "\"__truediv__\" was called from", "/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22", "", "\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)", "\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)", "\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)", "\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)", "\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)", "\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)", "\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)", "\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)", "\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)", "\tat org.apache.spark.scheduler.Task.run(Task.scala:146)", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)", "\tat java.base/java.lang.Thread.run(Thread.java:840)", "\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)", "\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)", "\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)", "\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)", "\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)", "\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)", "\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)", "\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)", "\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)", "\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)", "\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)", "\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)", "\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)", "\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)", "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)", "\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)", "\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)", "\tat py4j.Gateway.invoke(Gateway.java:282)", "\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)", "\tat py4j.commands.CallCommand.execute(CallCommand.java:79)", "\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)", "\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)", "\tat java.base/java.lang.Thread.run(Thread.java:840)"]
  },
}
```

### Why are the changes needed?

**Before**

Currently we don't have PySpark dedicated logging module so we have to manually set up and customize the Python logging module, for example:

```python
logger = logging.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}")
```

This logs an information just in a following simple string:

```
INFO:TestLogger:User test_user takes an test_action
```

This is not very actionable, and it is hard to analyze not since it is not well-structured.

Or we can use Log4j from JVM which resulting in excessively detailed logs as described in the above example, and this way even cannot be applied to Spark Connect.

**After**

We can simply import and use `PySparkLogger` with minimal setup:

```python
from pyspark.logger import PySparkLogger
logger = PySparkLogger.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}", user=user, action=action)
```

This logs an information in a following JSON format:

```json
{
  "ts": "2024-06-28 19:44:19,030",
  "level": "WARNING",
  "logger": "TestLogger",
  "msg": "User test_user takes an test_action",
  "context": {
    "user": "test_user",
    "action": "test_action"
  },
}
```

**NOTE:** we can add as many keyword arguments as we want for each logging methods. These keyword arguments, such as `user` and `action` in the example, are included within the `"context"` field of the JSON log. This structure makes it easy to track and analyze the log.

### Does this PR introduce _any_ user-facing change?

No API changes, but the PySpark client-side logging is improved.

Also added user-facing documentation "Logging in PySpark":

<img width="1395" alt="Screenshot 2024-07-16 at 5 40 41 PM" src="https://github.com/user-attachments/assets/c77236aa-1c6f-4b5b-ad14-26ccdc474f59">

Also added API reference:

<img width="1417" alt="Screenshot 2024-07-16 at 5 40 58 PM" src="https://github.com/user-attachments/assets/6bb3fb23-6847-4086-8f4b-bcf9f4242724">

### How was this patch tested?

Added UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47145 from itholic/pyspark_logger.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…er in log

### What changes were proposed in this pull request?

This PR followups for apache#47145 to rename the log field naming

### Why are the changes needed?

`line_no` is not very intuitive so we better renaming to `line_number` explicitly.

### Does this PR introduce _any_ user-facing change?

No API change, but user-facing log message will be improved

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
-->
The existing CI should pass

### Was this patch authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this patch, please include the
phrase: 'Generated-by: ' followed by the name of the tool and its version.
If no, write 'No'.
Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
-->
No

Closes apache#47437 from itholic/logger_followup.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Haejoon Lee <haejoon.lee@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants