From 0d3eaf7dd23bdd04afbca65b8050185a88988957 Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Sun, 10 Dec 2023 02:01:35 +0530 Subject: [PATCH 01/11] spark-submit - changes added to spark-submit hook --- .../providers/apache/spark/hooks/spark_submit.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py index b64f3a0c22f9b..b495cafeb6088 100644 --- a/airflow/providers/apache/spark/hooks/spark_submit.py +++ b/airflow/providers/apache/spark/hooks/spark_submit.py @@ -78,8 +78,13 @@ class SparkSubmitHook(BaseHook, LoggingMixin): :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :param spark_binary: The command to use for spark submit. Some distros may use spark2-submit or spark3-submit. + (will overwrite any spark_binary defined in the connection's extra JSON) :param properties_file: Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. + :param queue: The name of the YARN queue to which the application is submitted. + (will overwrite any yarn queue defined in the connection's extra JSON) + :param deploy_mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an client. + (will overwrite any deployment mode defined in the connection's extra JSON) :param use_krb5ccache: if True, configure spark to use ticket cache instead of relying on keytab for Kerberos login """ @@ -125,6 +130,8 @@ def __init__( verbose: bool = False, spark_binary: str | None = None, properties_file: str | None = None, + queue: str | None = None, + deploy_mode: str | None = None, *, use_krb5ccache: bool = False, ) -> None: @@ -159,6 +166,8 @@ def __init__( self._kubernetes_driver_pod: str | None = None self.spark_binary = spark_binary self._properties_file = properties_file + self._queue = queue + self._deploy_mode = deploy_mode self._connection = self._resolve_connection() self._is_yarn = "yarn" in self._connection["master"] self._is_kubernetes = "k8s" in self._connection["master"] @@ -204,8 +213,8 @@ def _resolve_connection(self) -> dict[str, Any]: # Determine optional yarn queue from the extra field extra = conn.extra_dejson - conn_data["queue"] = extra.get("queue") - conn_data["deploy_mode"] = extra.get("deploy-mode") + conn_data["queue"] = self._queue if self._queue else extra.get("queue") + conn_data["deploy_mode"] = self._deploy_mode if self._deploy_mode else extra.get("deploy-mode") if not self.spark_binary: self.spark_binary = extra.get("spark-binary", "spark-submit") if self.spark_binary is not None and self.spark_binary not in ALLOWED_SPARK_BINARIES: From 2b478beefbe2973b16e67274a6cc7f264d7563a4 Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Sun, 10 Dec 2023 02:05:53 +0530 Subject: [PATCH 02/11] spark-submit - changes added to spark-submit operator --- .../providers/apache/spark/operators/spark_submit.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/airflow/providers/apache/spark/operators/spark_submit.py b/airflow/providers/apache/spark/operators/spark_submit.py index 3f4c539536786..5efc9c07e8ed1 100644 --- a/airflow/providers/apache/spark/operators/spark_submit.py +++ b/airflow/providers/apache/spark/operators/spark_submit.py @@ -71,6 +71,10 @@ class SparkSubmitOperator(BaseOperator): Some distros may use spark2-submit or spark3-submit. :param properties_file: Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. + :param queue: The name of the YARN queue to which the application is submitted. + (will overwrite any yarn queue defined in the connection's extra JSON) + :param deploy_mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an client. + (will overwrite any deployment mode defined in the connection's extra JSON) :param use_krb5ccache: if True, configure spark to use ticket cache instead of relying on keytab for Kerberos login """ @@ -124,6 +128,8 @@ def __init__( verbose: bool = False, spark_binary: str | None = None, properties_file: str | None = None, + queue: str | None = None, + deploy_mode: str | None = None, use_krb5ccache: bool = False, **kwargs: Any, ) -> None: @@ -154,6 +160,8 @@ def __init__( self._verbose = verbose self._spark_binary = spark_binary self._properties_file = properties_file + self._queue = queue + self._deploy_mode = deploy_mode self._hook: SparkSubmitHook | None = None self._conn_id = conn_id self._use_krb5ccache = use_krb5ccache @@ -197,5 +205,7 @@ def _get_hook(self) -> SparkSubmitHook: verbose=self._verbose, spark_binary=self._spark_binary, properties_file=self._properties_file, + queue= self._queue , + deploy_mode=self._deploy_mode , use_krb5ccache=self._use_krb5ccache, ) From ccbbd738b8ff15fce78912a78a8b74cb09f444b5 Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Sun, 10 Dec 2023 02:21:07 +0530 Subject: [PATCH 03/11] spark-submit - tests added --- airflow/providers/apache/spark/operators/spark_submit.py | 2 +- tests/providers/apache/spark/operators/test_spark_submit.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/airflow/providers/apache/spark/operators/spark_submit.py b/airflow/providers/apache/spark/operators/spark_submit.py index 5efc9c07e8ed1..89079791d4026 100644 --- a/airflow/providers/apache/spark/operators/spark_submit.py +++ b/airflow/providers/apache/spark/operators/spark_submit.py @@ -73,7 +73,7 @@ class SparkSubmitOperator(BaseOperator): specified, this will look for conf/spark-defaults.conf. :param queue: The name of the YARN queue to which the application is submitted. (will overwrite any yarn queue defined in the connection's extra JSON) - :param deploy_mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an client. + :param deploy_mode: Whether to deploy your driver on the worker nodes (cluster) or locally as a client. (will overwrite any deployment mode defined in the connection's extra JSON) :param use_krb5ccache: if True, configure spark to use ticket cache instead of relying on keytab for Kerberos login diff --git a/tests/providers/apache/spark/operators/test_spark_submit.py b/tests/providers/apache/spark/operators/test_spark_submit.py index 884036467f1f5..5e3aded5cec18 100644 --- a/tests/providers/apache/spark/operators/test_spark_submit.py +++ b/tests/providers/apache/spark/operators/test_spark_submit.py @@ -67,6 +67,8 @@ class TestSparkSubmitOperator: "args should keep embedded spaces", ], "use_krb5ccache": True, + "queue": "yarn_dev_queue2", + "deploy_mode": "client2" } def setup_method(self): @@ -120,6 +122,8 @@ def test_execute(self): "args should keep embedded spaces", ], "spark_binary": "sparky", + "queue": "yarn_dev_queue2", + "deploy_mode": "client2", "use_krb5ccache": True, "properties_file": "conf/spark-custom.conf", } @@ -149,6 +153,8 @@ def test_execute(self): assert expected_dict["driver_memory"] == operator._driver_memory assert expected_dict["application_args"] == operator._application_args assert expected_dict["spark_binary"] == operator._spark_binary + assert expected_dict["queue"] == operator._queue + assert expected_dict["deploy_mode"] == operator._deploy_mode assert expected_dict["properties_file"] == operator._properties_file assert expected_dict["use_krb5ccache"] == operator._use_krb5ccache From f020a577d1dc0f1aa37e1f4822eb99e185e47f1b Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Sun, 10 Dec 2023 02:46:41 +0530 Subject: [PATCH 04/11] spark-submit - tests added --- .../apache/spark/operators/test_spark_submit.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/providers/apache/spark/operators/test_spark_submit.py b/tests/providers/apache/spark/operators/test_spark_submit.py index 5e3aded5cec18..749c09ab100c6 100644 --- a/tests/providers/apache/spark/operators/test_spark_submit.py +++ b/tests/providers/apache/spark/operators/test_spark_submit.py @@ -158,6 +158,23 @@ def test_execute(self): assert expected_dict["properties_file"] == operator._properties_file assert expected_dict["use_krb5ccache"] == operator._use_krb5ccache + def test_spark_submit_cmd_connection_overrides(self): + config = self._config + # have to add this otherwise wee can't run + # _build_spark_submit_command + config["use_krb5ccache"] = False + operator = SparkSubmitOperator( + task_id="spark_submit_job", + spark_binary="sparky", + dag=self.dag, + **config + ) + + cmd = operator._get_hook()._build_spark_submit_command("test")[0] + assert "--queue yarn_dev_queue2" in cmd + assert "--deploy-mode client2" in cmd + assert "sparky" in cmd + @pytest.mark.db_test def test_render_template(self): # Given From 8bb0b38dfe556cc6863c5c582c4c2ad995d86c88 Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Sun, 10 Dec 2023 12:05:15 +0530 Subject: [PATCH 05/11] spark-submit - precommit fixes --- .../providers/apache/spark/operators/test_spark_submit.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/providers/apache/spark/operators/test_spark_submit.py b/tests/providers/apache/spark/operators/test_spark_submit.py index 749c09ab100c6..4b53ee218e532 100644 --- a/tests/providers/apache/spark/operators/test_spark_submit.py +++ b/tests/providers/apache/spark/operators/test_spark_submit.py @@ -68,7 +68,7 @@ class TestSparkSubmitOperator: ], "use_krb5ccache": True, "queue": "yarn_dev_queue2", - "deploy_mode": "client2" + "deploy_mode": "client2", } def setup_method(self): @@ -164,10 +164,7 @@ def test_spark_submit_cmd_connection_overrides(self): # _build_spark_submit_command config["use_krb5ccache"] = False operator = SparkSubmitOperator( - task_id="spark_submit_job", - spark_binary="sparky", - dag=self.dag, - **config + task_id="spark_submit_job", spark_binary="sparky", dag=self.dag, **config ) cmd = operator._get_hook()._build_spark_submit_command("test")[0] From 53505f054d232af8a562ce8a2e7716466d79cf77 Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Sun, 10 Dec 2023 12:26:56 +0530 Subject: [PATCH 06/11] spark-submit - test fixes --- .../apache/spark/operators/test_spark_submit.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/providers/apache/spark/operators/test_spark_submit.py b/tests/providers/apache/spark/operators/test_spark_submit.py index 4b53ee218e532..b6e6e09b68513 100644 --- a/tests/providers/apache/spark/operators/test_spark_submit.py +++ b/tests/providers/apache/spark/operators/test_spark_submit.py @@ -166,12 +166,20 @@ def test_spark_submit_cmd_connection_overrides(self): operator = SparkSubmitOperator( task_id="spark_submit_job", spark_binary="sparky", dag=self.dag, **config ) - - cmd = operator._get_hook()._build_spark_submit_command("test")[0] + cmd = " ".join(operator._get_hook()._build_spark_submit_command("test")) assert "--queue yarn_dev_queue2" in cmd assert "--deploy-mode client2" in cmd assert "sparky" in cmd + # if we don't pass any overrides in arguments + config["queue"] = None + config["deploy_mode"] = None + operator2 = SparkSubmitOperator(task_id="spark_submit_job2", dag=self.dag, **config) + cmd2 = " ".join(operator2._get_hook()._build_spark_submit_command("test")) + assert "--queue root.default" in cmd2 + assert "--deploy-mode client2" not in cmd2 + assert "spark-submit" in cmd2 + @pytest.mark.db_test def test_render_template(self): # Given From f3d86235adc257040bee04c0d4245e6a09cb1efd Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Mon, 11 Dec 2023 00:36:18 +0530 Subject: [PATCH 07/11] spark-submit - test fixes --- tests/providers/apache/spark/operators/test_spark_submit.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/providers/apache/spark/operators/test_spark_submit.py b/tests/providers/apache/spark/operators/test_spark_submit.py index b6e6e09b68513..3330ca88ddf64 100644 --- a/tests/providers/apache/spark/operators/test_spark_submit.py +++ b/tests/providers/apache/spark/operators/test_spark_submit.py @@ -158,9 +158,10 @@ def test_execute(self): assert expected_dict["properties_file"] == operator._properties_file assert expected_dict["use_krb5ccache"] == operator._use_krb5ccache + @pytest.mark.db_test def test_spark_submit_cmd_connection_overrides(self): config = self._config - # have to add this otherwise wee can't run + # have to add this otherwise we can't run # _build_spark_submit_command config["use_krb5ccache"] = False operator = SparkSubmitOperator( From f16b84aa72717f6a088e98c721d936ed76282f45 Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Mon, 11 Dec 2023 01:33:43 +0530 Subject: [PATCH 08/11] spark-submit - precommit fixes --- .../fab/auth_manager/cli_commands/user_command.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/airflow/providers/fab/auth_manager/cli_commands/user_command.py b/airflow/providers/fab/auth_manager/cli_commands/user_command.py index a53425ca5da9c..fc86707e85d6d 100644 --- a/airflow/providers/fab/auth_manager/cli_commands/user_command.py +++ b/airflow/providers/fab/auth_manager/cli_commands/user_command.py @@ -194,10 +194,10 @@ def users_import(args): users_created, users_updated = _import_users(users_list) if users_created: - print("Created the following users:\n\t{}".format("\n\t".join(users_created))) + print(f"Created the following users:\n\t{'\\n\\t'.join(users_created)}") if users_updated: - print("Updated the following users:\n\t{}".format("\n\t".join(users_updated))) + print(f"Updated the following users:\n\t{'\\n\\t'.join(users_updated)}") def _import_users(users_list: list[dict[str, Any]]): @@ -213,9 +213,7 @@ def _import_users(users_list: list[dict[str, Any]]): msg.append(f"[Item {row_num}]") for key, value in failure.items(): msg.append(f"\t{key}: {value}") - raise SystemExit( - "Error: Input file didn't pass validation. See below:\n{}".format("\n".join(msg)) - ) + raise SystemExit(f"Error: Input file didn't pass validation. See below:\n{'\\n'.join(msg)}") for user in users_list: roles = [] From 78c972bb485e453838357bd26207609d82c92c44 Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Tue, 12 Dec 2023 00:10:50 +0530 Subject: [PATCH 09/11] Revert "spark-submit - precommit fixes" This reverts commit 8c8d7bf27575df69b03747688b23a1a292206c5a. --- .../fab/auth_manager/cli_commands/user_command.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow/providers/fab/auth_manager/cli_commands/user_command.py b/airflow/providers/fab/auth_manager/cli_commands/user_command.py index fc86707e85d6d..a53425ca5da9c 100644 --- a/airflow/providers/fab/auth_manager/cli_commands/user_command.py +++ b/airflow/providers/fab/auth_manager/cli_commands/user_command.py @@ -194,10 +194,10 @@ def users_import(args): users_created, users_updated = _import_users(users_list) if users_created: - print(f"Created the following users:\n\t{'\\n\\t'.join(users_created)}") + print("Created the following users:\n\t{}".format("\n\t".join(users_created))) if users_updated: - print(f"Updated the following users:\n\t{'\\n\\t'.join(users_updated)}") + print("Updated the following users:\n\t{}".format("\n\t".join(users_updated))) def _import_users(users_list: list[dict[str, Any]]): @@ -213,7 +213,9 @@ def _import_users(users_list: list[dict[str, Any]]): msg.append(f"[Item {row_num}]") for key, value in failure.items(): msg.append(f"\t{key}: {value}") - raise SystemExit(f"Error: Input file didn't pass validation. See below:\n{'\\n'.join(msg)}") + raise SystemExit( + "Error: Input file didn't pass validation. See below:\n{}".format("\n".join(msg)) + ) for user in users_list: roles = [] From 79470f10856047bbd9307833151e115c761252cd Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Tue, 12 Dec 2023 00:14:48 +0530 Subject: [PATCH 10/11] spark-submit - precommit fixes --- airflow/providers/apache/spark/operators/spark_submit.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/apache/spark/operators/spark_submit.py b/airflow/providers/apache/spark/operators/spark_submit.py index 89079791d4026..5b5f38429433a 100644 --- a/airflow/providers/apache/spark/operators/spark_submit.py +++ b/airflow/providers/apache/spark/operators/spark_submit.py @@ -205,7 +205,7 @@ def _get_hook(self) -> SparkSubmitHook: verbose=self._verbose, spark_binary=self._spark_binary, properties_file=self._properties_file, - queue= self._queue , - deploy_mode=self._deploy_mode , + queue=self._queue, + deploy_mode=self._deploy_mode, use_krb5ccache=self._use_krb5ccache, ) From 820b2faba26a7266b86dbc800e5beedf03a6e211 Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Tue, 12 Dec 2023 00:17:02 +0530 Subject: [PATCH 11/11] spark-submit - adding docs --- airflow/providers/apache/spark/operators/spark_submit.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/apache/spark/operators/spark_submit.py b/airflow/providers/apache/spark/operators/spark_submit.py index 5b5f38429433a..be2f2d0ac5a40 100644 --- a/airflow/providers/apache/spark/operators/spark_submit.py +++ b/airflow/providers/apache/spark/operators/spark_submit.py @@ -69,6 +69,7 @@ class SparkSubmitOperator(BaseOperator): :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :param spark_binary: The command to use for spark submit. Some distros may use spark2-submit or spark3-submit. + (will overwrite any spark_binary defined in the connection's extra JSON) :param properties_file: Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. :param queue: The name of the YARN queue to which the application is submitted.