Skip to content

Commit

Permalink
[SPARK-48752][PYTHON][CONNECT][DOCS] Introduce pyspark.logger for i…
Browse files Browse the repository at this point in the history
…mproved structured logging for PySpark

### What changes were proposed in this pull request?

This PR introduces the `pyspark.logger` module to facilitate structured client-side logging for PySpark users.

This module includes a `PySparkLogger` class that provides several methods for logging messages at different levels in a structured JSON format:
- `PySparkLogger.info`
- `PySparkLogger.warning`
- `PySparkLogger.error`

The logger can be easily configured to write logs to either the console or a specified file.

## DataFrame error log improvement

This PR also improves the DataFrame API error logs by leveraging this new logging framework:

### **Before**

We introduced structured logging from apache#45729, but PySpark log is still hard to figure out in the current structured log, because it is hidden and mixed within bunch of complex JVM stacktraces and it's also not very Python-friendly:

```json
{
  "ts": "2024-06-28T10:53:48.528Z",
  "level": "ERROR",
  "msg": "Exception in task 7.0 in stage 0.0 (TID 7)",
  "context": {
    "task_name": "task 7.0 in stage 0.0 (TID 7)"
  },
  "exception": {
    "class": "org.apache.spark.SparkArithmeticException",
    "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
    "stacktrace": [
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors$",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": 203
      },
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "project_doConsume_0$",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "processNext",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.execution.BufferedRowIterator",
        "method": "hasNext",
        "file": "BufferedRowIterator.java",
        "line": 43
      },
      {
        "class": "org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1",
        "method": "hasNext",
        "file": "WholeStageCodegenEvaluatorFactory.scala",
        "line": 50
      },
      {
        "class": "org.apache.spark.sql.execution.SparkPlan",
        "method": "$anonfun$getByteArrayRdd$1",
        "file": "SparkPlan.scala",
        "line": 388
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2$adapted",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.MapPartitionsRDD",
        "method": "compute",
        "file": "MapPartitionsRDD.scala",
        "line": 52
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "computeOrReadCheckpoint",
        "file": "RDD.scala",
        "line": 369
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "iterator",
        "file": "RDD.scala",
        "line": 333
      },
      {
        "class": "org.apache.spark.scheduler.ResultTask",
        "method": "runTask",
        "file": "ResultTask.scala",
        "line": 93
      },
      {
        "class": "org.apache.spark.TaskContext",
        "method": "runTaskWithListeners",
        "file": "TaskContext.scala",
        "line": 171
      },
      {
        "class": "org.apache.spark.scheduler.Task",
        "method": "run",
        "file": "Task.scala",
        "line": 146
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "$anonfun$run$5",
        "file": "Executor.scala",
        "line": 644
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally",
        "file": "SparkErrorUtils.scala",
        "line": 64
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally$",
        "file": "SparkErrorUtils.scala",
        "line": 61
      },
      {
        "class": "org.apache.spark.util.Utils$",
        "method": "tryWithSafeFinally",
        "file": "Utils.scala",
        "line": 99
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "run",
        "file": "Executor.scala",
        "line": 647
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor",
        "method": "runWorker",
        "file": "ThreadPoolExecutor.java",
        "line": 1136
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "method": "run",
        "file": "ThreadPoolExecutor.java",
        "line": 635
      },
      {
        "class": "java.lang.Thread",
        "method": "run",
        "file": "Thread.java",
        "line": 840
      }
    ]
  },
  "logger": "Executor"
}

```

### **After**

Now we can get a improved, simplified and also Python-friendly error log for DataFrame errors:

```json
{
  "ts": "2024-06-28 19:53:48,563",
  "level": "ERROR",
  "logger": "DataFrameQueryContextLogger",
  "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
  "context": {
    "file": "/.../spark/python/test_error_context.py",
    "line_no": "17",
    "fragment": "__truediv__"
    "error_class": "DIVIDE_BY_ZERO"
  },
  "exception": {
    "class": "Py4JJavaError",
    "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22\n\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)\n\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)\n\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)\n\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)\n\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)\n\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)\n\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n",
    "stacktrace": ["Traceback (most recent call last):", "  File \"/Users/haejoon.lee/Desktop/git_repos/spark/python/pyspark/errors/exceptions/captured.py\", line 272, in deco", "    return f(*a, **kw)", "  File \"/Users/haejoon.lee/anaconda3/envs/pyspark-dev-env/lib/python3.9/site-packages/py4j/protocol.py\", line 326, in get_return_value", "    raise Py4JJavaError(", "py4j.protocol.Py4JJavaError: An error occurred while calling o52.showString.", ": org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012", "== DataFrame ==", "\"__truediv__\" was called from", "/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22", "", "\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)", "\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)", "\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)", "\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)", "\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)", "\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)", "\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)", "\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)", "\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)", "\tat org.apache.spark.scheduler.Task.run(Task.scala:146)", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)", "\tat java.base/java.lang.Thread.run(Thread.java:840)", "\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)", "\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)", "\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)", "\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)", "\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)", "\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)", "\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)", "\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)", "\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)", "\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)", "\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)", "\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)", "\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)", "\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)", "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)", "\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)", "\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)", "\tat py4j.Gateway.invoke(Gateway.java:282)", "\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)", "\tat py4j.commands.CallCommand.execute(CallCommand.java:79)", "\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)", "\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)", "\tat java.base/java.lang.Thread.run(Thread.java:840)"]
  },
}
```

### Why are the changes needed?

**Before**

Currently we don't have PySpark dedicated logging module so we have to manually set up and customize the Python logging module, for example:

```python
logger = logging.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}")
```

This logs an information just in a following simple string:

```
INFO:TestLogger:User test_user takes an test_action
```

This is not very actionable, and it is hard to analyze not since it is not well-structured.

Or we can use Log4j from JVM which resulting in excessively detailed logs as described in the above example, and this way even cannot be applied to Spark Connect.

**After**

We can simply import and use `PySparkLogger` with minimal setup:

```python
from pyspark.logger import PySparkLogger
logger = PySparkLogger.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}", user=user, action=action)
```

This logs an information in a following JSON format:

```json
{
  "ts": "2024-06-28 19:44:19,030",
  "level": "WARNING",
  "logger": "TestLogger",
  "msg": "User test_user takes an test_action",
  "context": {
    "user": "test_user",
    "action": "test_action"
  },
}
```

**NOTE:** we can add as many keyword arguments as we want for each logging methods. These keyword arguments, such as `user` and `action` in the example, are included within the `"context"` field of the JSON log. This structure makes it easy to track and analyze the log.

### Does this PR introduce _any_ user-facing change?

No API changes, but the PySpark client-side logging is improved.

Also added user-facing documentation "Logging in PySpark":

<img width="1395" alt="Screenshot 2024-07-16 at 5 40 41 PM" src="https://github.com/user-attachments/assets/c77236aa-1c6f-4b5b-ad14-26ccdc474f59">

Also added API reference:

<img width="1417" alt="Screenshot 2024-07-16 at 5 40 58 PM" src="https://github.com/user-attachments/assets/6bb3fb23-6847-4086-8f4b-bcf9f4242724">

### How was this patch tested?

Added UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47145 from itholic/pyspark_logger.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
itholic authored and attilapiros committed Oct 4, 2024
1 parent 7c9f252 commit 119eac2
Show file tree
Hide file tree
Showing 18 changed files with 676 additions and 0 deletions.
12 changes: 12 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,18 @@ def __hash__(self):
],
)

pyspark_logging = Module(
name="pyspark-logger",
dependencies=[],
source_file_regexes=["python/pyspark/logger"],
python_test_goals=[
# unittests
"pyspark.logger.tests.test_logger",
"pyspark.logger.tests.connect.test_parity_logger",
],
)


sparkr = Module(
name="sparkr",
dependencies=[hive, mllib],
Expand Down
2 changes: 2 additions & 0 deletions dev/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ per-file-ignores =
examples/src/main/python/sql/datasource.py: F841,
# Exclude * imports in test files
python/pyspark/errors/tests/*.py: F403,
python/pyspark/logger/tests/*.py: F403,
python/pyspark/logger/tests/connect/*.py: F403,
python/pyspark/ml/tests/*.py: F403,
python/pyspark/mllib/tests/*.py: F403,
python/pyspark/pandas/tests/*.py: F401 F403,
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/development/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ Development
testing
debugging
setting_ide
logger
errors

162 changes: 162 additions & 0 deletions python/docs/source/development/logger.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
==================
Logging in PySpark
==================

.. currentmodule:: pyspark.logger

Introduction
============

The :ref:`pyspark.logger</reference/pyspark.logger.rst>` module facilitates structured client-side logging for PySpark users.

This module includes a :class:`PySparkLogger` class that provides several methods for logging messages at different levels in a structured JSON format:

- :meth:`PySparkLogger.info`
- :meth:`PySparkLogger.warning`
- :meth:`PySparkLogger.error`
- :meth:`PySparkLogger.exception`

The logger can be easily configured to write logs to either the console or a specified file.

Customizing Log Format
======================
The default log format is JSON, which includes the timestamp, log level, logger name, and the log message along with any additional context provided.

Example log entry:

.. code-block:: python
{
"ts": "2024-06-28 19:53:48,563",
"level": "ERROR",
"logger": "DataFrameQueryContextLogger",
"msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/.../spark/python/test_error_context.py:17\n",
"context": {
"file": "/path/to/file.py",
"line_no": "17",
"fragment": "divide"
"error_class": "DIVIDE_BY_ZERO"
},
"exception": {
"class": "Py4JJavaError",
"msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/path/to/file.py:17 ...",
"stacktrace": ["Traceback (most recent call last):", " File \".../spark/python/pyspark/errors/exceptions/captured.py\", line 247, in deco", " return f(*a, **kw)", " File \".../lib/python3.9/site-packages/py4j/protocol.py\", line 326, in get_return_value" ...]
},
}
Setting Up
==========
To start using the PySpark logging module, you need to import the :class:`PySparkLogger` from the :ref:`pyspark.logger</reference/pyspark.logger.rst>`.

.. code-block:: python
from pyspark.logger import PySparkLogger
Usage
=====
Creating a Logger
-----------------
You can create a logger instance by calling the :meth:`PySparkLogger.getLogger`. By default, it creates a logger named "PySparkLogger" with an INFO log level.

.. code-block:: python
logger = PySparkLogger.getLogger()
Logging Messages
----------------
The logger provides three main methods for log messages: :meth:`PySparkLogger.info`, :meth:`PySparkLogger.warning` and :meth:`PySparkLogger.error`.

- **PySparkLogger.info**: Use this method to log informational messages.

.. code-block:: python
user = "test_user"
action = "login"
logger.info(f"User {user} performed {action}", user=user, action=action)
- **PySparkLogger.warning**: Use this method to log warning messages.

.. code-block:: python
user = "test_user"
action = "access"
logger.warning("User {user} attempted an unauthorized {action}", user=user, action=action)
- **PySparkLogger.error**: Use this method to log error messages.

.. code-block:: python
user = "test_user"
action = "update_profile"
logger.error("An error occurred for user {user} during {action}", user=user, action=action)
Logging to Console
------------------

.. code-block:: python
from pyspark.logger import PySparkLogger
# Create a logger that logs to console
logger = PySparkLogger.getLogger("ConsoleLogger")
user = "test_user"
action = "test_action"
logger.warning(f"User {user} takes an {action}", user=user, action=action)
This logs an information in the following JSON format:

.. code-block:: python
{
"ts": "2024-06-28 19:44:19,030",
"level": "WARNING",
"logger": "ConsoleLogger",
"msg": "User test_user takes an test_action",
"context": {
"user": "test_user",
"action": "test_action"
},
}
Logging to a File
-----------------

To log messages to a file, use the :meth:`PySparkLogger.addHandler` for adding `FileHandler` from the standard Python logging module to your logger.

This approach aligns with the standard Python logging practices.

.. code-block:: python
from pyspark.logger import PySparkLogger
import logging
# Create a logger that logs to a file
file_logger = PySparkLogger.getLogger("FileLogger")
handler = logging.FileHandler("application.log")
file_logger.addHandler(handler)
user = "test_user"
action = "test_action"
file_logger.warning(f"User {user} takes an {action}", user=user, action=action)
The log messages will be saved in `application.log` in the same JSON format.
1 change: 1 addition & 0 deletions python/docs/source/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ This page lists an overview of all public PySpark modules, classes, functions an
pyspark
pyspark.resource
pyspark.errors
pyspark.logger
pyspark.testing
47 changes: 47 additions & 0 deletions python/docs/source/reference/pyspark.logger.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
======
Logger
======

Classes
-------

.. currentmodule:: pyspark.logger

.. autosummary::
:toctree: api/

PySparkLogger


Methods
-------

.. currentmodule:: pyspark.logger

.. autosummary::
:toctree: api/

PySparkLogger.getLogger
PySparkLogger.addHandler
PySparkLogger.info
PySparkLogger.warning
PySparkLogger.error
PySparkLogger.exception
3 changes: 3 additions & 0 deletions python/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ ignore_errors = True
[mypy-pyspark.errors.tests.*]
ignore_errors = True

[mypy-pyspark.logger.tests.*]
ignore_errors = True

; Allow non-strict optional for pyspark.pandas

[mypy-pyspark.pandas.*]
Expand Down
1 change: 1 addition & 0 deletions python/packaging/classic/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ def run(self):
"pyspark.errors",
"pyspark.errors.exceptions",
"pyspark.examples.src.main.python",
"pyspark.logger",
],
include_package_data=True,
package_dir={
Expand Down
3 changes: 3 additions & 0 deletions python/packaging/connect/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@
"pyspark.pandas.tests.connect.reshape",
"pyspark.pandas.tests.connect.series",
"pyspark.pandas.tests.connect.window",
"pyspark.logger.tests",
"pyspark.logger.tests.connect",
]

try:
Expand Down Expand Up @@ -175,6 +177,7 @@
"pyspark.resource",
"pyspark.errors",
"pyspark.errors.exceptions",
"pyspark.logger",
]

setup(
Expand Down
23 changes: 23 additions & 0 deletions python/pyspark/errors/exceptions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import Dict, Optional, cast, Iterable, TYPE_CHECKING, List

from pyspark.errors.utils import ErrorClassesReader
from pyspark.logger import PySparkLogger
from pickle import PicklingError

if TYPE_CHECKING:
Expand Down Expand Up @@ -129,6 +130,28 @@ def getQueryContext(self) -> List["QueryContext"]:
"""
return self._query_contexts

def _log_exception(self) -> None:
query_contexts = self.getQueryContext()
query_context = query_contexts[0] if len(query_contexts) != 0 else None
if query_context:
if query_context.contextType().name == "DataFrame":
logger = PySparkLogger.getLogger("DataFrameQueryContextLogger")
call_site = query_context.callSite().split(":")
line_no = call_site[1] if len(call_site) == 2 else ""
logger.exception(
self.getMessage(),
file=call_site[0],
line_no=line_no,
fragment=query_context.fragment(),
error_class=self.getErrorClass(),
)
else:
logger = PySparkLogger.getLogger("SQLQueryContextLogger")
logger.exception(
self.getMessage(),
error_class=self.getErrorClass(),
)

def __str__(self) -> str:
if self.getErrorClass() is not None:
return self.getMessage()
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/errors/exceptions/captured.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(
if self._cause is None and origin is not None and origin.getCause() is not None:
self._cause = convert_exception(origin.getCause())
self._origin = origin
self._log_exception()

def __str__(self) -> str:
from pyspark import SparkContext
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/errors/exceptions/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def __init__(
self._stacktrace: Optional[str] = server_stacktrace
self._display_stacktrace: bool = display_server_stacktrace
self._query_contexts: List[BaseQueryContext] = query_contexts
self._log_exception()

def getSqlState(self) -> Optional[str]:
if self._sql_state is not None:
Expand Down
27 changes: 27 additions & 0 deletions python/pyspark/logger/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
PySpark logging
"""
from pyspark.logger.logger import ( # noqa: F401
PySparkLogger,
)

__all__ = [
"PySparkLogger",
]
Loading

0 comments on commit 119eac2

Please sign in to comment.