From 119eac2a92272fc14f9c77cdb90e672ae63cf82f Mon Sep 17 00:00:00 2001 From: Haejoon Lee Date: Thu, 18 Jul 2024 14:04:11 +0900 Subject: [PATCH] [SPARK-48752][PYTHON][CONNECT][DOCS] Introduce `pyspark.logger` for improved structured logging for PySpark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? This PR introduces the `pyspark.logger` module to facilitate structured client-side logging for PySpark users. This module includes a `PySparkLogger` class that provides several methods for logging messages at different levels in a structured JSON format: - `PySparkLogger.info` - `PySparkLogger.warning` - `PySparkLogger.error` The logger can be easily configured to write logs to either the console or a specified file. ## DataFrame error log improvement This PR also improves the DataFrame API error logs by leveraging this new logging framework: ### **Before** We introduced structured logging from https://github.com/apache/spark/pull/45729, but PySpark log is still hard to figure out in the current structured log, because it is hidden and mixed within bunch of complex JVM stacktraces and it's also not very Python-friendly: ```json { "ts": "2024-06-28T10:53:48.528Z", "level": "ERROR", "msg": "Exception in task 7.0 in stage 0.0 (TID 7)", "context": { "task_name": "task 7.0 in stage 0.0 (TID 7)" }, "exception": { "class": "org.apache.spark.SparkArithmeticException", "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n", "stacktrace": [ { "class": "org.apache.spark.sql.errors.QueryExecutionErrors$", "method": "divideByZeroError", "file": "QueryExecutionErrors.scala", "line": 203 }, { "class": "org.apache.spark.sql.errors.QueryExecutionErrors", "method": "divideByZeroError", "file": "QueryExecutionErrors.scala", "line": -1 }, { "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1", "method": "project_doConsume_0$", "file": null, "line": -1 }, { "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1", "method": "processNext", "file": null, "line": -1 }, { "class": "org.apache.spark.sql.execution.BufferedRowIterator", "method": "hasNext", "file": "BufferedRowIterator.java", "line": 43 }, { "class": "org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1", "method": "hasNext", "file": "WholeStageCodegenEvaluatorFactory.scala", "line": 50 }, { "class": "org.apache.spark.sql.execution.SparkPlan", "method": "$anonfun$getByteArrayRdd$1", "file": "SparkPlan.scala", "line": 388 }, { "class": "org.apache.spark.rdd.RDD", "method": "$anonfun$mapPartitionsInternal$2", "file": "RDD.scala", "line": 896 }, { "class": "org.apache.spark.rdd.RDD", "method": "$anonfun$mapPartitionsInternal$2$adapted", "file": "RDD.scala", "line": 896 }, { "class": "org.apache.spark.rdd.MapPartitionsRDD", "method": "compute", "file": "MapPartitionsRDD.scala", "line": 52 }, { "class": "org.apache.spark.rdd.RDD", "method": "computeOrReadCheckpoint", "file": "RDD.scala", "line": 369 }, { "class": "org.apache.spark.rdd.RDD", "method": "iterator", "file": "RDD.scala", "line": 333 }, { "class": "org.apache.spark.scheduler.ResultTask", "method": "runTask", "file": "ResultTask.scala", "line": 93 }, { "class": "org.apache.spark.TaskContext", "method": "runTaskWithListeners", "file": "TaskContext.scala", "line": 171 }, { "class": "org.apache.spark.scheduler.Task", "method": "run", "file": "Task.scala", "line": 146 }, { "class": "org.apache.spark.executor.Executor$TaskRunner", "method": "$anonfun$run$5", "file": "Executor.scala", "line": 644 }, { "class": "org.apache.spark.util.SparkErrorUtils", "method": "tryWithSafeFinally", "file": "SparkErrorUtils.scala", "line": 64 }, { "class": "org.apache.spark.util.SparkErrorUtils", "method": "tryWithSafeFinally$", "file": "SparkErrorUtils.scala", "line": 61 }, { "class": "org.apache.spark.util.Utils$", "method": "tryWithSafeFinally", "file": "Utils.scala", "line": 99 }, { "class": "org.apache.spark.executor.Executor$TaskRunner", "method": "run", "file": "Executor.scala", "line": 647 }, { "class": "java.util.concurrent.ThreadPoolExecutor", "method": "runWorker", "file": "ThreadPoolExecutor.java", "line": 1136 }, { "class": "java.util.concurrent.ThreadPoolExecutor$Worker", "method": "run", "file": "ThreadPoolExecutor.java", "line": 635 }, { "class": "java.lang.Thread", "method": "run", "file": "Thread.java", "line": 840 } ] }, "logger": "Executor" } ``` ### **After** Now we can get a improved, simplified and also Python-friendly error log for DataFrame errors: ```json { "ts": "2024-06-28 19:53:48,563", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n", "context": { "file": "/.../spark/python/test_error_context.py", "line_no": "17", "fragment": "__truediv__" "error_class": "DIVIDE_BY_ZERO" }, "exception": { "class": "Py4JJavaError", "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22\n\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)\n\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)\n\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)\n\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)\n\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)\n\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)\n\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n", "stacktrace": ["Traceback (most recent call last):", " File \"/Users/haejoon.lee/Desktop/git_repos/spark/python/pyspark/errors/exceptions/captured.py\", line 272, in deco", " return f(*a, **kw)", " File \"/Users/haejoon.lee/anaconda3/envs/pyspark-dev-env/lib/python3.9/site-packages/py4j/protocol.py\", line 326, in get_return_value", " raise Py4JJavaError(", "py4j.protocol.Py4JJavaError: An error occurred while calling o52.showString.", ": org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012", "== DataFrame ==", "\"__truediv__\" was called from", "/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22", "", "\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)", "\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)", "\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)", "\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)", "\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)", "\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)", "\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)", "\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)", "\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)", "\tat org.apache.spark.scheduler.Task.run(Task.scala:146)", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)", "\tat java.base/java.lang.Thread.run(Thread.java:840)", "\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)", "\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)", "\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)", "\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)", "\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)", "\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)", "\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)", "\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)", "\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)", "\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)", "\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)", "\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)", "\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)", "\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)", "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)", "\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)", "\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)", "\tat py4j.Gateway.invoke(Gateway.java:282)", "\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)", "\tat py4j.commands.CallCommand.execute(CallCommand.java:79)", "\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)", "\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)", "\tat java.base/java.lang.Thread.run(Thread.java:840)"] }, } ``` ### Why are the changes needed? **Before** Currently we don't have PySpark dedicated logging module so we have to manually set up and customize the Python logging module, for example: ```python logger = logging.getLogger("TestLogger") user = "test_user" action = "test_action" logger.info(f"User {user} takes an {action}") ``` This logs an information just in a following simple string: ``` INFO:TestLogger:User test_user takes an test_action ``` This is not very actionable, and it is hard to analyze not since it is not well-structured. Or we can use Log4j from JVM which resulting in excessively detailed logs as described in the above example, and this way even cannot be applied to Spark Connect. **After** We can simply import and use `PySparkLogger` with minimal setup: ```python from pyspark.logger import PySparkLogger logger = PySparkLogger.getLogger("TestLogger") user = "test_user" action = "test_action" logger.info(f"User {user} takes an {action}", user=user, action=action) ``` This logs an information in a following JSON format: ```json { "ts": "2024-06-28 19:44:19,030", "level": "WARNING", "logger": "TestLogger", "msg": "User test_user takes an test_action", "context": { "user": "test_user", "action": "test_action" }, } ``` **NOTE:** we can add as many keyword arguments as we want for each logging methods. These keyword arguments, such as `user` and `action` in the example, are included within the `"context"` field of the JSON log. This structure makes it easy to track and analyze the log. ### Does this PR introduce _any_ user-facing change? No API changes, but the PySpark client-side logging is improved. Also added user-facing documentation "Logging in PySpark": Screenshot 2024-07-16 at 5 40 41 PM Also added API reference: Screenshot 2024-07-16 at 5 40 58 PM ### How was this patch tested? Added UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47145 from itholic/pyspark_logger. Authored-by: Haejoon Lee Signed-off-by: Hyukjin Kwon --- dev/sparktestsupport/modules.py | 12 ++ dev/tox.ini | 2 + python/docs/source/development/index.rst | 1 + python/docs/source/development/logger.rst | 162 +++++++++++++++ python/docs/source/reference/index.rst | 1 + .../docs/source/reference/pyspark.logger.rst | 47 +++++ python/mypy.ini | 3 + python/packaging/classic/setup.py | 1 + python/packaging/connect/setup.py | 3 + python/pyspark/errors/exceptions/base.py | 23 ++ python/pyspark/errors/exceptions/captured.py | 1 + python/pyspark/errors/exceptions/connect.py | 1 + python/pyspark/logger/__init__.py | 27 +++ python/pyspark/logger/logger.py | 196 ++++++++++++++++++ python/pyspark/logger/tests/__init__.py | 16 ++ .../pyspark/logger/tests/connect/__init__.py | 16 ++ .../tests/connect/test_parity_logger.py | 36 ++++ python/pyspark/logger/tests/test_logger.py | 128 ++++++++++++ 18 files changed, 676 insertions(+) create mode 100644 python/docs/source/development/logger.rst create mode 100644 python/docs/source/reference/pyspark.logger.rst create mode 100644 python/pyspark/logger/__init__.py create mode 100644 python/pyspark/logger/logger.py create mode 100644 python/pyspark/logger/tests/__init__.py create mode 100644 python/pyspark/logger/tests/connect/__init__.py create mode 100644 python/pyspark/logger/tests/connect/test_parity_logger.py create mode 100644 python/pyspark/logger/tests/test_logger.py diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 8f4ed6014d77f..8e595707494cd 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -1424,6 +1424,18 @@ def __hash__(self): ], ) +pyspark_logging = Module( + name="pyspark-logger", + dependencies=[], + source_file_regexes=["python/pyspark/logger"], + python_test_goals=[ + # unittests + "pyspark.logger.tests.test_logger", + "pyspark.logger.tests.connect.test_parity_logger", + ], +) + + sparkr = Module( name="sparkr", dependencies=[hive, mllib], diff --git a/dev/tox.ini b/dev/tox.ini index c5905d12a80a1..47b1b4a9d7832 100644 --- a/dev/tox.ini +++ b/dev/tox.ini @@ -33,6 +33,8 @@ per-file-ignores = examples/src/main/python/sql/datasource.py: F841, # Exclude * imports in test files python/pyspark/errors/tests/*.py: F403, + python/pyspark/logger/tests/*.py: F403, + python/pyspark/logger/tests/connect/*.py: F403, python/pyspark/ml/tests/*.py: F403, python/pyspark/mllib/tests/*.py: F403, python/pyspark/pandas/tests/*.py: F401 F403, diff --git a/python/docs/source/development/index.rst b/python/docs/source/development/index.rst index 50150e096c1a8..f013894c444be 100644 --- a/python/docs/source/development/index.rst +++ b/python/docs/source/development/index.rst @@ -26,5 +26,6 @@ Development testing debugging setting_ide + logger errors diff --git a/python/docs/source/development/logger.rst b/python/docs/source/development/logger.rst new file mode 100644 index 0000000000000..d809dbf728508 --- /dev/null +++ b/python/docs/source/development/logger.rst @@ -0,0 +1,162 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +================== +Logging in PySpark +================== + +.. currentmodule:: pyspark.logger + +Introduction +============ + +The :ref:`pyspark.logger` module facilitates structured client-side logging for PySpark users. + +This module includes a :class:`PySparkLogger` class that provides several methods for logging messages at different levels in a structured JSON format: + +- :meth:`PySparkLogger.info` +- :meth:`PySparkLogger.warning` +- :meth:`PySparkLogger.error` +- :meth:`PySparkLogger.exception` + +The logger can be easily configured to write logs to either the console or a specified file. + +Customizing Log Format +====================== +The default log format is JSON, which includes the timestamp, log level, logger name, and the log message along with any additional context provided. + +Example log entry: + +.. code-block:: python + + { + "ts": "2024-06-28 19:53:48,563", + "level": "ERROR", + "logger": "DataFrameQueryContextLogger", + "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/.../spark/python/test_error_context.py:17\n", + "context": { + "file": "/path/to/file.py", + "line_no": "17", + "fragment": "divide" + "error_class": "DIVIDE_BY_ZERO" + }, + "exception": { + "class": "Py4JJavaError", + "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/path/to/file.py:17 ...", + "stacktrace": ["Traceback (most recent call last):", " File \".../spark/python/pyspark/errors/exceptions/captured.py\", line 247, in deco", " return f(*a, **kw)", " File \".../lib/python3.9/site-packages/py4j/protocol.py\", line 326, in get_return_value" ...] + }, + } + +Setting Up +========== +To start using the PySpark logging module, you need to import the :class:`PySparkLogger` from the :ref:`pyspark.logger`. + +.. code-block:: python + + from pyspark.logger import PySparkLogger + +Usage +===== +Creating a Logger +----------------- +You can create a logger instance by calling the :meth:`PySparkLogger.getLogger`. By default, it creates a logger named "PySparkLogger" with an INFO log level. + +.. code-block:: python + + logger = PySparkLogger.getLogger() + +Logging Messages +---------------- +The logger provides three main methods for log messages: :meth:`PySparkLogger.info`, :meth:`PySparkLogger.warning` and :meth:`PySparkLogger.error`. + +- **PySparkLogger.info**: Use this method to log informational messages. + + .. code-block:: python + + user = "test_user" + action = "login" + logger.info(f"User {user} performed {action}", user=user, action=action) + +- **PySparkLogger.warning**: Use this method to log warning messages. + + .. code-block:: python + + user = "test_user" + action = "access" + logger.warning("User {user} attempted an unauthorized {action}", user=user, action=action) + +- **PySparkLogger.error**: Use this method to log error messages. + + .. code-block:: python + + user = "test_user" + action = "update_profile" + logger.error("An error occurred for user {user} during {action}", user=user, action=action) + +Logging to Console +------------------ + +.. code-block:: python + + from pyspark.logger import PySparkLogger + + # Create a logger that logs to console + logger = PySparkLogger.getLogger("ConsoleLogger") + + user = "test_user" + action = "test_action" + + logger.warning(f"User {user} takes an {action}", user=user, action=action) + +This logs an information in the following JSON format: + +.. code-block:: python + + { + "ts": "2024-06-28 19:44:19,030", + "level": "WARNING", + "logger": "ConsoleLogger", + "msg": "User test_user takes an test_action", + "context": { + "user": "test_user", + "action": "test_action" + }, + } + +Logging to a File +----------------- + +To log messages to a file, use the :meth:`PySparkLogger.addHandler` for adding `FileHandler` from the standard Python logging module to your logger. + +This approach aligns with the standard Python logging practices. + +.. code-block:: python + + from pyspark.logger import PySparkLogger + import logging + + # Create a logger that logs to a file + file_logger = PySparkLogger.getLogger("FileLogger") + handler = logging.FileHandler("application.log") + file_logger.addHandler(handler) + + user = "test_user" + action = "test_action" + + file_logger.warning(f"User {user} takes an {action}", user=user, action=action) + +The log messages will be saved in `application.log` in the same JSON format. diff --git a/python/docs/source/reference/index.rst b/python/docs/source/reference/index.rst index 800a7a4b34d42..918c726157efb 100644 --- a/python/docs/source/reference/index.rst +++ b/python/docs/source/reference/index.rst @@ -39,4 +39,5 @@ This page lists an overview of all public PySpark modules, classes, functions an pyspark pyspark.resource pyspark.errors + pyspark.logger pyspark.testing diff --git a/python/docs/source/reference/pyspark.logger.rst b/python/docs/source/reference/pyspark.logger.rst new file mode 100644 index 0000000000000..9d5f6689de4aa --- /dev/null +++ b/python/docs/source/reference/pyspark.logger.rst @@ -0,0 +1,47 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +====== +Logger +====== + +Classes +------- + +.. currentmodule:: pyspark.logger + +.. autosummary:: + :toctree: api/ + + PySparkLogger + + +Methods +------- + +.. currentmodule:: pyspark.logger + +.. autosummary:: + :toctree: api/ + + PySparkLogger.getLogger + PySparkLogger.addHandler + PySparkLogger.info + PySparkLogger.warning + PySparkLogger.error + PySparkLogger.exception diff --git a/python/mypy.ini b/python/mypy.ini index c7cf8df114147..4daa185933343 100644 --- a/python/mypy.ini +++ b/python/mypy.ini @@ -126,6 +126,9 @@ ignore_errors = True [mypy-pyspark.errors.tests.*] ignore_errors = True +[mypy-pyspark.logger.tests.*] +ignore_errors = True + ; Allow non-strict optional for pyspark.pandas [mypy-pyspark.pandas.*] diff --git a/python/packaging/classic/setup.py b/python/packaging/classic/setup.py index 5e94c2b653806..79b74483f00dd 100755 --- a/python/packaging/classic/setup.py +++ b/python/packaging/classic/setup.py @@ -312,6 +312,7 @@ def run(self): "pyspark.errors", "pyspark.errors.exceptions", "pyspark.examples.src.main.python", + "pyspark.logger", ], include_package_data=True, package_dir={ diff --git a/python/packaging/connect/setup.py b/python/packaging/connect/setup.py index bc1d4fd2868de..ab166c79747df 100755 --- a/python/packaging/connect/setup.py +++ b/python/packaging/connect/setup.py @@ -105,6 +105,8 @@ "pyspark.pandas.tests.connect.reshape", "pyspark.pandas.tests.connect.series", "pyspark.pandas.tests.connect.window", + "pyspark.logger.tests", + "pyspark.logger.tests.connect", ] try: @@ -175,6 +177,7 @@ "pyspark.resource", "pyspark.errors", "pyspark.errors.exceptions", + "pyspark.logger", ] setup( diff --git a/python/pyspark/errors/exceptions/base.py b/python/pyspark/errors/exceptions/base.py index dcfc6df77a77a..e33492fbe15ed 100644 --- a/python/pyspark/errors/exceptions/base.py +++ b/python/pyspark/errors/exceptions/base.py @@ -19,6 +19,7 @@ from typing import Dict, Optional, cast, Iterable, TYPE_CHECKING, List from pyspark.errors.utils import ErrorClassesReader +from pyspark.logger import PySparkLogger from pickle import PicklingError if TYPE_CHECKING: @@ -129,6 +130,28 @@ def getQueryContext(self) -> List["QueryContext"]: """ return self._query_contexts + def _log_exception(self) -> None: + query_contexts = self.getQueryContext() + query_context = query_contexts[0] if len(query_contexts) != 0 else None + if query_context: + if query_context.contextType().name == "DataFrame": + logger = PySparkLogger.getLogger("DataFrameQueryContextLogger") + call_site = query_context.callSite().split(":") + line_no = call_site[1] if len(call_site) == 2 else "" + logger.exception( + self.getMessage(), + file=call_site[0], + line_no=line_no, + fragment=query_context.fragment(), + error_class=self.getErrorClass(), + ) + else: + logger = PySparkLogger.getLogger("SQLQueryContextLogger") + logger.exception( + self.getMessage(), + error_class=self.getErrorClass(), + ) + def __str__(self) -> str: if self.getErrorClass() is not None: return self.getMessage() diff --git a/python/pyspark/errors/exceptions/captured.py b/python/pyspark/errors/exceptions/captured.py index b5bb742161c06..8a79f78e4c80e 100644 --- a/python/pyspark/errors/exceptions/captured.py +++ b/python/pyspark/errors/exceptions/captured.py @@ -73,6 +73,7 @@ def __init__( if self._cause is None and origin is not None and origin.getCause() is not None: self._cause = convert_exception(origin.getCause()) self._origin = origin + self._log_exception() def __str__(self) -> str: from pyspark import SparkContext diff --git a/python/pyspark/errors/exceptions/connect.py b/python/pyspark/errors/exceptions/connect.py index 8a95358f26975..efa3fdcf1e56d 100644 --- a/python/pyspark/errors/exceptions/connect.py +++ b/python/pyspark/errors/exceptions/connect.py @@ -325,6 +325,7 @@ def __init__( self._stacktrace: Optional[str] = server_stacktrace self._display_stacktrace: bool = display_server_stacktrace self._query_contexts: List[BaseQueryContext] = query_contexts + self._log_exception() def getSqlState(self) -> Optional[str]: if self._sql_state is not None: diff --git a/python/pyspark/logger/__init__.py b/python/pyspark/logger/__init__.py new file mode 100644 index 0000000000000..d8fab8beca8d8 --- /dev/null +++ b/python/pyspark/logger/__init__.py @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +PySpark logging +""" +from pyspark.logger.logger import ( # noqa: F401 + PySparkLogger, +) + +__all__ = [ + "PySparkLogger", +] diff --git a/python/pyspark/logger/logger.py b/python/pyspark/logger/logger.py new file mode 100644 index 0000000000000..975441a9cb572 --- /dev/null +++ b/python/pyspark/logger/logger.py @@ -0,0 +1,196 @@ +# -*- encoding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import json +from typing import cast, Optional + + +class JSONFormatter(logging.Formatter): + """ + Custom JSON formatter for logging records. + + This formatter converts the log record to a JSON object with the following fields: + - timestamp: The time the log record was created. + - level: The log level of the record. + - name: The name of the logger. + - message: The log message. + - kwargs: Any additional keyword arguments passed to the logger. + """ + + def format(self, record: logging.LogRecord) -> str: + """ + Format the specified record as a JSON string. + + Parameters + ---------- + record : logging.LogRecord + The log record to be formatted. + + Returns + ------- + str + The formatted log record as a JSON string. + """ + log_entry = { + "ts": self.formatTime(record, self.datefmt), + "level": record.levelname, + "logger": record.name, + "msg": record.getMessage(), + "context": record.__dict__.get("kwargs", {}), + } + if record.exc_info: + exc_type, exc_value, exc_tb = record.exc_info + log_entry["exception"] = { + "class": exc_type.__name__ if exc_type else "UnknownException", + "msg": str(exc_value), + "stacktrace": self.formatException(record.exc_info).splitlines(), + } + return json.dumps(log_entry, ensure_ascii=False) + + +class PySparkLogger(logging.Logger): + """ + Custom logging.Logger wrapper for PySpark that logs messages in a structured JSON format. + + PySparkLogger extends the standard Python logging.Logger class, allowing seamless integration + with existing logging setups. It customizes the log output to JSON format, including additional + context information, making it more useful for PySpark applications. + + .. versionadded:: 4.0.0 + + Example + ------- + >>> import logging + >>> import json + >>> from io import StringIO + >>> from pyspark.logger import PySparkLogger + + >>> logger = PySparkLogger.getLogger("ExampleLogger") + >>> logger.setLevel(logging.INFO) + >>> stream = StringIO() + >>> handler = logging.StreamHandler(stream) + >>> logger.addHandler(handler) + + >>> logger.info( + ... "This is an informational message", + ... extra={"user": "test_user", "action": "test_action"} + ... ) + >>> log_output = stream.getvalue().strip().split('\\n')[0] + >>> log = json.loads(log_output) + >>> _ = log.pop("ts") # Remove the timestamp field for static testing + + >>> print(json.dumps(log, ensure_ascii=False, indent=2)) + { + "level": "INFO", + "logger": "ExampleLogger", + "msg": "This is an informational message", + "context": { + "extra": { + "user": "test_user", + "action": "test_action" + } + } + } + """ + + def __init__(self, name: str = "PySparkLogger"): + super().__init__(name, level=logging.WARN) + _handler = logging.StreamHandler() + self.addHandler(_handler) + + def addHandler(self, handler: logging.Handler) -> None: + """ + Add the specified handler to this logger in structured JSON format. + """ + handler.setFormatter(JSONFormatter()) + super().addHandler(handler) + + @staticmethod + def getLogger(name: Optional[str] = None) -> "PySparkLogger": + """ + Return a PySparkLogger with the specified name, creating it if necessary. + + If no name is specified, return the logging.RootLogger. + + Parameters + ---------- + name : str, optional + The name of the logger. + + Returns + ------- + PySparkLogger + A configured instance of PySparkLogger. + """ + existing_logger = logging.getLoggerClass() + if not isinstance(existing_logger, PySparkLogger): + logging.setLoggerClass(PySparkLogger) + + pyspark_logger = logging.getLogger(name) + # Reset to the existing logger + logging.setLoggerClass(existing_logger) + + return cast(PySparkLogger, pyspark_logger) + + def info(self, msg: object, *args: object, **kwargs: object) -> None: + """ + Log 'msg % args' with severity 'INFO' in structured JSON format. + + Parameters + ---------- + msg : str + The log message. + """ + super().info(msg, *args, extra={"kwargs": kwargs}) + + def warning(self, msg: object, *args: object, **kwargs: object) -> None: + """ + Log 'msg % args' with severity 'WARNING' in structured JSON format. + + Parameters + ---------- + msg : str + The log message. + """ + super().warning(msg, *args, extra={"kwargs": kwargs}) + + def error(self, msg: object, *args: object, **kwargs: object) -> None: + """ + Log 'msg % args' with severity 'ERROR' in structured JSON format. + + Parameters + ---------- + msg : str + The log message. + """ + super().error(msg, *args, extra={"kwargs": kwargs}) + + def exception(self, msg: object, *args: object, **kwargs: object) -> None: + """ + Convenience method for logging an ERROR with exception information. + + Parameters + ---------- + msg : str + The log message. + exc_info : bool = True + If True, exception information is added to the logging message. + This includes the exception type, value, and traceback. Default is True. + """ + super().error(msg, *args, exc_info=True, extra={"kwargs": kwargs}) diff --git a/python/pyspark/logger/tests/__init__.py b/python/pyspark/logger/tests/__init__.py new file mode 100644 index 0000000000000..cce3acad34a49 --- /dev/null +++ b/python/pyspark/logger/tests/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/pyspark/logger/tests/connect/__init__.py b/python/pyspark/logger/tests/connect/__init__.py new file mode 100644 index 0000000000000..cce3acad34a49 --- /dev/null +++ b/python/pyspark/logger/tests/connect/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/pyspark/logger/tests/connect/test_parity_logger.py b/python/pyspark/logger/tests/connect/test_parity_logger.py new file mode 100644 index 0000000000000..2e59fc56d5c62 --- /dev/null +++ b/python/pyspark/logger/tests/connect/test_parity_logger.py @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.logger.tests.test_logger import LoggerTestsMixin +from pyspark.testing.connectutils import ReusedConnectTestCase + + +class LoggerParityTests(LoggerTestsMixin, ReusedConnectTestCase): + pass + + +if __name__ == "__main__": + import unittest + from pyspark.logger.tests.connect.test_parity_logger import * # noqa: F401 + + try: + import xmlrunner # type: ignore[import] + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/logger/tests/test_logger.py b/python/pyspark/logger/tests/test_logger.py new file mode 100644 index 0000000000000..514b37dc8b6f7 --- /dev/null +++ b/python/pyspark/logger/tests/test_logger.py @@ -0,0 +1,128 @@ +# -*- encoding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +import unittest +import json +from io import StringIO +from pyspark.errors import ArithmeticException +from pyspark.logger.logger import PySparkLogger +from pyspark.testing.sqlutils import ReusedSQLTestCase + + +class LoggerTestsMixin: + def setUp(self): + self.handler = logging.StreamHandler(StringIO()) + + self.logger = PySparkLogger.getLogger("TestLogger") + self.logger.setLevel(logging.INFO) + self.logger.addHandler(self.handler) + + dataframe_query_context_logger = PySparkLogger.getLogger("DataFrameQueryContextLogger") + dataframe_query_context_logger.setLevel(logging.INFO) + dataframe_query_context_logger.addHandler(self.handler) + + def test_log_structure(self): + self.logger.info("Test logging structure") + log_json = json.loads(self.handler.stream.getvalue().strip()) + keys = ["ts", "level", "logger", "msg", "context"] + for key in keys: + self.assertTrue(key in log_json) + + def test_log_info(self): + self.logger.info("This is an info log", user="test_user_info", action="test_action_info") + log_json = json.loads(self.handler.stream.getvalue().strip()) + + self.assertEqual(log_json["msg"], "This is an info log") + self.assertEqual( + log_json["context"], {"action": "test_action_info", "user": "test_user_info"} + ) + self.assertTrue("exception" not in log_json) + + def test_log_warn(self): + self.logger.warn("This is an warn log", user="test_user_warn", action="test_action_warn") + log_json = json.loads(self.handler.stream.getvalue().strip()) + + self.assertEqual(log_json["msg"], "This is an warn log") + self.assertEqual( + log_json["context"], {"action": "test_action_warn", "user": "test_user_warn"} + ) + self.assertTrue("exception" not in log_json) + + def test_log_error(self): + self.logger.error( + "This is an error log", user="test_user_error", action="test_action_error" + ) + log_json = json.loads(self.handler.stream.getvalue().strip()) + + self.assertEqual(log_json["msg"], "This is an error log") + self.assertEqual( + log_json["context"], {"action": "test_action_error", "user": "test_user_error"} + ) + self.assertTrue("exception" not in log_json) + + def test_log_exception(self): + self.logger.exception( + "This is an exception log", user="test_user_exception", action="test_action_exception" + ) + log_json = json.loads(self.handler.stream.getvalue().strip()) + + self.assertEqual(log_json["msg"], "This is an exception log") + self.assertEqual( + log_json["context"], {"action": "test_action_exception", "user": "test_user_exception"} + ) + self.assertTrue("exception" in log_json) + self.assertTrue("class" in log_json["exception"]) + self.assertTrue("msg" in log_json["exception"]) + self.assertTrue("stacktrace" in log_json["exception"]) + + def test_dataframe_query_context_logger(self): + with self.sql_conf({"spark.sql.ansi.enabled": True}): + df = self.spark.range(10) + + with self.assertRaises(ArithmeticException): + df.withColumn("div_zero", df.id / 0).collect() + log_json = json.loads(self.handler.stream.getvalue().strip()) + + err_msg = "[DIVIDE_BY_ZERO] Division by zero." + self.assertTrue(err_msg in log_json["msg"]) + self.assertTrue(err_msg in log_json["exception"]["msg"]) + self.assertEqual(log_json["context"]["fragment"], "__truediv__") + self.assertEqual(log_json["context"]["error_class"], "DIVIDE_BY_ZERO") + # Only the class name is different between classic and connect. + # Py4JJavaError for classic, _MultiThreadedRendezvous for connect + self.assertTrue( + log_json["exception"]["class"] in ("Py4JJavaError", "_MultiThreadedRendezvous") + ) + self.assertTrue("Traceback" in log_json["exception"]["stacktrace"][0]) + + +class LoggerTests(LoggerTestsMixin, ReusedSQLTestCase): + pass + + +if __name__ == "__main__": + import unittest + from pyspark.logger.tests.test_logger import * # noqa: F401 + + try: + import xmlrunner + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2)