diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 8f4ed6014d77f..8e595707494cd 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -1424,6 +1424,18 @@ def __hash__(self): ], ) +pyspark_logging = Module( + name="pyspark-logger", + dependencies=[], + source_file_regexes=["python/pyspark/logger"], + python_test_goals=[ + # unittests + "pyspark.logger.tests.test_logger", + "pyspark.logger.tests.connect.test_parity_logger", + ], +) + + sparkr = Module( name="sparkr", dependencies=[hive, mllib], diff --git a/dev/tox.ini b/dev/tox.ini index c5905d12a80a1..47b1b4a9d7832 100644 --- a/dev/tox.ini +++ b/dev/tox.ini @@ -33,6 +33,8 @@ per-file-ignores = examples/src/main/python/sql/datasource.py: F841, # Exclude * imports in test files python/pyspark/errors/tests/*.py: F403, + python/pyspark/logger/tests/*.py: F403, + python/pyspark/logger/tests/connect/*.py: F403, python/pyspark/ml/tests/*.py: F403, python/pyspark/mllib/tests/*.py: F403, python/pyspark/pandas/tests/*.py: F401 F403, diff --git a/python/docs/source/development/index.rst b/python/docs/source/development/index.rst index 50150e096c1a8..f013894c444be 100644 --- a/python/docs/source/development/index.rst +++ b/python/docs/source/development/index.rst @@ -26,5 +26,6 @@ Development testing debugging setting_ide + logger errors diff --git a/python/docs/source/development/logger.rst b/python/docs/source/development/logger.rst new file mode 100644 index 0000000000000..d809dbf728508 --- /dev/null +++ b/python/docs/source/development/logger.rst @@ -0,0 +1,162 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +================== +Logging in PySpark +================== + +.. currentmodule:: pyspark.logger + +Introduction +============ + +The :ref:`pyspark.logger` module facilitates structured client-side logging for PySpark users. + +This module includes a :class:`PySparkLogger` class that provides several methods for logging messages at different levels in a structured JSON format: + +- :meth:`PySparkLogger.info` +- :meth:`PySparkLogger.warning` +- :meth:`PySparkLogger.error` +- :meth:`PySparkLogger.exception` + +The logger can be easily configured to write logs to either the console or a specified file. + +Customizing Log Format +====================== +The default log format is JSON, which includes the timestamp, log level, logger name, and the log message along with any additional context provided. + +Example log entry: + +.. code-block:: python + + { + "ts": "2024-06-28 19:53:48,563", + "level": "ERROR", + "logger": "DataFrameQueryContextLogger", + "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/.../spark/python/test_error_context.py:17\n", + "context": { + "file": "/path/to/file.py", + "line_no": "17", + "fragment": "divide" + "error_class": "DIVIDE_BY_ZERO" + }, + "exception": { + "class": "Py4JJavaError", + "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/path/to/file.py:17 ...", + "stacktrace": ["Traceback (most recent call last):", " File \".../spark/python/pyspark/errors/exceptions/captured.py\", line 247, in deco", " return f(*a, **kw)", " File \".../lib/python3.9/site-packages/py4j/protocol.py\", line 326, in get_return_value" ...] + }, + } + +Setting Up +========== +To start using the PySpark logging module, you need to import the :class:`PySparkLogger` from the :ref:`pyspark.logger`. + +.. code-block:: python + + from pyspark.logger import PySparkLogger + +Usage +===== +Creating a Logger +----------------- +You can create a logger instance by calling the :meth:`PySparkLogger.getLogger`. By default, it creates a logger named "PySparkLogger" with an INFO log level. + +.. code-block:: python + + logger = PySparkLogger.getLogger() + +Logging Messages +---------------- +The logger provides three main methods for log messages: :meth:`PySparkLogger.info`, :meth:`PySparkLogger.warning` and :meth:`PySparkLogger.error`. + +- **PySparkLogger.info**: Use this method to log informational messages. + + .. code-block:: python + + user = "test_user" + action = "login" + logger.info(f"User {user} performed {action}", user=user, action=action) + +- **PySparkLogger.warning**: Use this method to log warning messages. + + .. code-block:: python + + user = "test_user" + action = "access" + logger.warning("User {user} attempted an unauthorized {action}", user=user, action=action) + +- **PySparkLogger.error**: Use this method to log error messages. + + .. code-block:: python + + user = "test_user" + action = "update_profile" + logger.error("An error occurred for user {user} during {action}", user=user, action=action) + +Logging to Console +------------------ + +.. code-block:: python + + from pyspark.logger import PySparkLogger + + # Create a logger that logs to console + logger = PySparkLogger.getLogger("ConsoleLogger") + + user = "test_user" + action = "test_action" + + logger.warning(f"User {user} takes an {action}", user=user, action=action) + +This logs an information in the following JSON format: + +.. code-block:: python + + { + "ts": "2024-06-28 19:44:19,030", + "level": "WARNING", + "logger": "ConsoleLogger", + "msg": "User test_user takes an test_action", + "context": { + "user": "test_user", + "action": "test_action" + }, + } + +Logging to a File +----------------- + +To log messages to a file, use the :meth:`PySparkLogger.addHandler` for adding `FileHandler` from the standard Python logging module to your logger. + +This approach aligns with the standard Python logging practices. + +.. code-block:: python + + from pyspark.logger import PySparkLogger + import logging + + # Create a logger that logs to a file + file_logger = PySparkLogger.getLogger("FileLogger") + handler = logging.FileHandler("application.log") + file_logger.addHandler(handler) + + user = "test_user" + action = "test_action" + + file_logger.warning(f"User {user} takes an {action}", user=user, action=action) + +The log messages will be saved in `application.log` in the same JSON format. diff --git a/python/docs/source/reference/index.rst b/python/docs/source/reference/index.rst index 800a7a4b34d42..918c726157efb 100644 --- a/python/docs/source/reference/index.rst +++ b/python/docs/source/reference/index.rst @@ -39,4 +39,5 @@ This page lists an overview of all public PySpark modules, classes, functions an pyspark pyspark.resource pyspark.errors + pyspark.logger pyspark.testing diff --git a/python/docs/source/reference/pyspark.logger.rst b/python/docs/source/reference/pyspark.logger.rst new file mode 100644 index 0000000000000..9d5f6689de4aa --- /dev/null +++ b/python/docs/source/reference/pyspark.logger.rst @@ -0,0 +1,47 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +====== +Logger +====== + +Classes +------- + +.. currentmodule:: pyspark.logger + +.. autosummary:: + :toctree: api/ + + PySparkLogger + + +Methods +------- + +.. currentmodule:: pyspark.logger + +.. autosummary:: + :toctree: api/ + + PySparkLogger.getLogger + PySparkLogger.addHandler + PySparkLogger.info + PySparkLogger.warning + PySparkLogger.error + PySparkLogger.exception diff --git a/python/mypy.ini b/python/mypy.ini index c7cf8df114147..4daa185933343 100644 --- a/python/mypy.ini +++ b/python/mypy.ini @@ -126,6 +126,9 @@ ignore_errors = True [mypy-pyspark.errors.tests.*] ignore_errors = True +[mypy-pyspark.logger.tests.*] +ignore_errors = True + ; Allow non-strict optional for pyspark.pandas [mypy-pyspark.pandas.*] diff --git a/python/packaging/classic/setup.py b/python/packaging/classic/setup.py index 5e94c2b653806..79b74483f00dd 100755 --- a/python/packaging/classic/setup.py +++ b/python/packaging/classic/setup.py @@ -312,6 +312,7 @@ def run(self): "pyspark.errors", "pyspark.errors.exceptions", "pyspark.examples.src.main.python", + "pyspark.logger", ], include_package_data=True, package_dir={ diff --git a/python/packaging/connect/setup.py b/python/packaging/connect/setup.py index bc1d4fd2868de..ab166c79747df 100755 --- a/python/packaging/connect/setup.py +++ b/python/packaging/connect/setup.py @@ -105,6 +105,8 @@ "pyspark.pandas.tests.connect.reshape", "pyspark.pandas.tests.connect.series", "pyspark.pandas.tests.connect.window", + "pyspark.logger.tests", + "pyspark.logger.tests.connect", ] try: @@ -175,6 +177,7 @@ "pyspark.resource", "pyspark.errors", "pyspark.errors.exceptions", + "pyspark.logger", ] setup( diff --git a/python/pyspark/errors/exceptions/base.py b/python/pyspark/errors/exceptions/base.py index dcfc6df77a77a..e33492fbe15ed 100644 --- a/python/pyspark/errors/exceptions/base.py +++ b/python/pyspark/errors/exceptions/base.py @@ -19,6 +19,7 @@ from typing import Dict, Optional, cast, Iterable, TYPE_CHECKING, List from pyspark.errors.utils import ErrorClassesReader +from pyspark.logger import PySparkLogger from pickle import PicklingError if TYPE_CHECKING: @@ -129,6 +130,28 @@ def getQueryContext(self) -> List["QueryContext"]: """ return self._query_contexts + def _log_exception(self) -> None: + query_contexts = self.getQueryContext() + query_context = query_contexts[0] if len(query_contexts) != 0 else None + if query_context: + if query_context.contextType().name == "DataFrame": + logger = PySparkLogger.getLogger("DataFrameQueryContextLogger") + call_site = query_context.callSite().split(":") + line_no = call_site[1] if len(call_site) == 2 else "" + logger.exception( + self.getMessage(), + file=call_site[0], + line_no=line_no, + fragment=query_context.fragment(), + error_class=self.getErrorClass(), + ) + else: + logger = PySparkLogger.getLogger("SQLQueryContextLogger") + logger.exception( + self.getMessage(), + error_class=self.getErrorClass(), + ) + def __str__(self) -> str: if self.getErrorClass() is not None: return self.getMessage() diff --git a/python/pyspark/errors/exceptions/captured.py b/python/pyspark/errors/exceptions/captured.py index b5bb742161c06..8a79f78e4c80e 100644 --- a/python/pyspark/errors/exceptions/captured.py +++ b/python/pyspark/errors/exceptions/captured.py @@ -73,6 +73,7 @@ def __init__( if self._cause is None and origin is not None and origin.getCause() is not None: self._cause = convert_exception(origin.getCause()) self._origin = origin + self._log_exception() def __str__(self) -> str: from pyspark import SparkContext diff --git a/python/pyspark/errors/exceptions/connect.py b/python/pyspark/errors/exceptions/connect.py index 8a95358f26975..efa3fdcf1e56d 100644 --- a/python/pyspark/errors/exceptions/connect.py +++ b/python/pyspark/errors/exceptions/connect.py @@ -325,6 +325,7 @@ def __init__( self._stacktrace: Optional[str] = server_stacktrace self._display_stacktrace: bool = display_server_stacktrace self._query_contexts: List[BaseQueryContext] = query_contexts + self._log_exception() def getSqlState(self) -> Optional[str]: if self._sql_state is not None: diff --git a/python/pyspark/logger/__init__.py b/python/pyspark/logger/__init__.py new file mode 100644 index 0000000000000..d8fab8beca8d8 --- /dev/null +++ b/python/pyspark/logger/__init__.py @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +PySpark logging +""" +from pyspark.logger.logger import ( # noqa: F401 + PySparkLogger, +) + +__all__ = [ + "PySparkLogger", +] diff --git a/python/pyspark/logger/logger.py b/python/pyspark/logger/logger.py new file mode 100644 index 0000000000000..975441a9cb572 --- /dev/null +++ b/python/pyspark/logger/logger.py @@ -0,0 +1,196 @@ +# -*- encoding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import json +from typing import cast, Optional + + +class JSONFormatter(logging.Formatter): + """ + Custom JSON formatter for logging records. + + This formatter converts the log record to a JSON object with the following fields: + - timestamp: The time the log record was created. + - level: The log level of the record. + - name: The name of the logger. + - message: The log message. + - kwargs: Any additional keyword arguments passed to the logger. + """ + + def format(self, record: logging.LogRecord) -> str: + """ + Format the specified record as a JSON string. + + Parameters + ---------- + record : logging.LogRecord + The log record to be formatted. + + Returns + ------- + str + The formatted log record as a JSON string. + """ + log_entry = { + "ts": self.formatTime(record, self.datefmt), + "level": record.levelname, + "logger": record.name, + "msg": record.getMessage(), + "context": record.__dict__.get("kwargs", {}), + } + if record.exc_info: + exc_type, exc_value, exc_tb = record.exc_info + log_entry["exception"] = { + "class": exc_type.__name__ if exc_type else "UnknownException", + "msg": str(exc_value), + "stacktrace": self.formatException(record.exc_info).splitlines(), + } + return json.dumps(log_entry, ensure_ascii=False) + + +class PySparkLogger(logging.Logger): + """ + Custom logging.Logger wrapper for PySpark that logs messages in a structured JSON format. + + PySparkLogger extends the standard Python logging.Logger class, allowing seamless integration + with existing logging setups. It customizes the log output to JSON format, including additional + context information, making it more useful for PySpark applications. + + .. versionadded:: 4.0.0 + + Example + ------- + >>> import logging + >>> import json + >>> from io import StringIO + >>> from pyspark.logger import PySparkLogger + + >>> logger = PySparkLogger.getLogger("ExampleLogger") + >>> logger.setLevel(logging.INFO) + >>> stream = StringIO() + >>> handler = logging.StreamHandler(stream) + >>> logger.addHandler(handler) + + >>> logger.info( + ... "This is an informational message", + ... extra={"user": "test_user", "action": "test_action"} + ... ) + >>> log_output = stream.getvalue().strip().split('\\n')[0] + >>> log = json.loads(log_output) + >>> _ = log.pop("ts") # Remove the timestamp field for static testing + + >>> print(json.dumps(log, ensure_ascii=False, indent=2)) + { + "level": "INFO", + "logger": "ExampleLogger", + "msg": "This is an informational message", + "context": { + "extra": { + "user": "test_user", + "action": "test_action" + } + } + } + """ + + def __init__(self, name: str = "PySparkLogger"): + super().__init__(name, level=logging.WARN) + _handler = logging.StreamHandler() + self.addHandler(_handler) + + def addHandler(self, handler: logging.Handler) -> None: + """ + Add the specified handler to this logger in structured JSON format. + """ + handler.setFormatter(JSONFormatter()) + super().addHandler(handler) + + @staticmethod + def getLogger(name: Optional[str] = None) -> "PySparkLogger": + """ + Return a PySparkLogger with the specified name, creating it if necessary. + + If no name is specified, return the logging.RootLogger. + + Parameters + ---------- + name : str, optional + The name of the logger. + + Returns + ------- + PySparkLogger + A configured instance of PySparkLogger. + """ + existing_logger = logging.getLoggerClass() + if not isinstance(existing_logger, PySparkLogger): + logging.setLoggerClass(PySparkLogger) + + pyspark_logger = logging.getLogger(name) + # Reset to the existing logger + logging.setLoggerClass(existing_logger) + + return cast(PySparkLogger, pyspark_logger) + + def info(self, msg: object, *args: object, **kwargs: object) -> None: + """ + Log 'msg % args' with severity 'INFO' in structured JSON format. + + Parameters + ---------- + msg : str + The log message. + """ + super().info(msg, *args, extra={"kwargs": kwargs}) + + def warning(self, msg: object, *args: object, **kwargs: object) -> None: + """ + Log 'msg % args' with severity 'WARNING' in structured JSON format. + + Parameters + ---------- + msg : str + The log message. + """ + super().warning(msg, *args, extra={"kwargs": kwargs}) + + def error(self, msg: object, *args: object, **kwargs: object) -> None: + """ + Log 'msg % args' with severity 'ERROR' in structured JSON format. + + Parameters + ---------- + msg : str + The log message. + """ + super().error(msg, *args, extra={"kwargs": kwargs}) + + def exception(self, msg: object, *args: object, **kwargs: object) -> None: + """ + Convenience method for logging an ERROR with exception information. + + Parameters + ---------- + msg : str + The log message. + exc_info : bool = True + If True, exception information is added to the logging message. + This includes the exception type, value, and traceback. Default is True. + """ + super().error(msg, *args, exc_info=True, extra={"kwargs": kwargs}) diff --git a/python/pyspark/logger/tests/__init__.py b/python/pyspark/logger/tests/__init__.py new file mode 100644 index 0000000000000..cce3acad34a49 --- /dev/null +++ b/python/pyspark/logger/tests/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/pyspark/logger/tests/connect/__init__.py b/python/pyspark/logger/tests/connect/__init__.py new file mode 100644 index 0000000000000..cce3acad34a49 --- /dev/null +++ b/python/pyspark/logger/tests/connect/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/pyspark/logger/tests/connect/test_parity_logger.py b/python/pyspark/logger/tests/connect/test_parity_logger.py new file mode 100644 index 0000000000000..2e59fc56d5c62 --- /dev/null +++ b/python/pyspark/logger/tests/connect/test_parity_logger.py @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.logger.tests.test_logger import LoggerTestsMixin +from pyspark.testing.connectutils import ReusedConnectTestCase + + +class LoggerParityTests(LoggerTestsMixin, ReusedConnectTestCase): + pass + + +if __name__ == "__main__": + import unittest + from pyspark.logger.tests.connect.test_parity_logger import * # noqa: F401 + + try: + import xmlrunner # type: ignore[import] + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/logger/tests/test_logger.py b/python/pyspark/logger/tests/test_logger.py new file mode 100644 index 0000000000000..514b37dc8b6f7 --- /dev/null +++ b/python/pyspark/logger/tests/test_logger.py @@ -0,0 +1,128 @@ +# -*- encoding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +import unittest +import json +from io import StringIO +from pyspark.errors import ArithmeticException +from pyspark.logger.logger import PySparkLogger +from pyspark.testing.sqlutils import ReusedSQLTestCase + + +class LoggerTestsMixin: + def setUp(self): + self.handler = logging.StreamHandler(StringIO()) + + self.logger = PySparkLogger.getLogger("TestLogger") + self.logger.setLevel(logging.INFO) + self.logger.addHandler(self.handler) + + dataframe_query_context_logger = PySparkLogger.getLogger("DataFrameQueryContextLogger") + dataframe_query_context_logger.setLevel(logging.INFO) + dataframe_query_context_logger.addHandler(self.handler) + + def test_log_structure(self): + self.logger.info("Test logging structure") + log_json = json.loads(self.handler.stream.getvalue().strip()) + keys = ["ts", "level", "logger", "msg", "context"] + for key in keys: + self.assertTrue(key in log_json) + + def test_log_info(self): + self.logger.info("This is an info log", user="test_user_info", action="test_action_info") + log_json = json.loads(self.handler.stream.getvalue().strip()) + + self.assertEqual(log_json["msg"], "This is an info log") + self.assertEqual( + log_json["context"], {"action": "test_action_info", "user": "test_user_info"} + ) + self.assertTrue("exception" not in log_json) + + def test_log_warn(self): + self.logger.warn("This is an warn log", user="test_user_warn", action="test_action_warn") + log_json = json.loads(self.handler.stream.getvalue().strip()) + + self.assertEqual(log_json["msg"], "This is an warn log") + self.assertEqual( + log_json["context"], {"action": "test_action_warn", "user": "test_user_warn"} + ) + self.assertTrue("exception" not in log_json) + + def test_log_error(self): + self.logger.error( + "This is an error log", user="test_user_error", action="test_action_error" + ) + log_json = json.loads(self.handler.stream.getvalue().strip()) + + self.assertEqual(log_json["msg"], "This is an error log") + self.assertEqual( + log_json["context"], {"action": "test_action_error", "user": "test_user_error"} + ) + self.assertTrue("exception" not in log_json) + + def test_log_exception(self): + self.logger.exception( + "This is an exception log", user="test_user_exception", action="test_action_exception" + ) + log_json = json.loads(self.handler.stream.getvalue().strip()) + + self.assertEqual(log_json["msg"], "This is an exception log") + self.assertEqual( + log_json["context"], {"action": "test_action_exception", "user": "test_user_exception"} + ) + self.assertTrue("exception" in log_json) + self.assertTrue("class" in log_json["exception"]) + self.assertTrue("msg" in log_json["exception"]) + self.assertTrue("stacktrace" in log_json["exception"]) + + def test_dataframe_query_context_logger(self): + with self.sql_conf({"spark.sql.ansi.enabled": True}): + df = self.spark.range(10) + + with self.assertRaises(ArithmeticException): + df.withColumn("div_zero", df.id / 0).collect() + log_json = json.loads(self.handler.stream.getvalue().strip()) + + err_msg = "[DIVIDE_BY_ZERO] Division by zero." + self.assertTrue(err_msg in log_json["msg"]) + self.assertTrue(err_msg in log_json["exception"]["msg"]) + self.assertEqual(log_json["context"]["fragment"], "__truediv__") + self.assertEqual(log_json["context"]["error_class"], "DIVIDE_BY_ZERO") + # Only the class name is different between classic and connect. + # Py4JJavaError for classic, _MultiThreadedRendezvous for connect + self.assertTrue( + log_json["exception"]["class"] in ("Py4JJavaError", "_MultiThreadedRendezvous") + ) + self.assertTrue("Traceback" in log_json["exception"]["stacktrace"][0]) + + +class LoggerTests(LoggerTestsMixin, ReusedSQLTestCase): + pass + + +if __name__ == "__main__": + import unittest + from pyspark.logger.tests.test_logger import * # noqa: F401 + + try: + import xmlrunner + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2)