diff --git a/airflow/providers/apache/hive/CHANGELOG.rst b/airflow/providers/apache/hive/CHANGELOG.rst index 3ab9928de0f26..66836e68876cd 100644 --- a/airflow/providers/apache/hive/CHANGELOG.rst +++ b/airflow/providers/apache/hive/CHANGELOG.rst @@ -27,6 +27,16 @@ Changelog --------- +7.0.0 +..... + + +Breaking changes +~~~~~~~~~~~~~~~~ + +* Remove the ability of specify a proxy user as an ``owner`` or ``login`` or ``as_param`` in the connection. Now, setting the user in ``Proxy User`` connection parameter or passing ``proxy_user`` to HiveHook will do the job. + + 6.4.2 ..... diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py index 5bdaf8084cb86..d0dfa10c62258 100644 --- a/airflow/providers/apache/hive/hooks/hive.py +++ b/airflow/providers/apache/hive/hooks/hive.py @@ -91,7 +91,6 @@ class HiveCliHook(BaseHook): def __init__( self, hive_cli_conn_id: str = default_conn_name, - run_as: str | None = None, mapred_queue: str | None = None, mapred_queue_priority: str | None = None, mapred_job_name: str | None = None, @@ -105,7 +104,6 @@ def __init__( self.use_beeline: bool = conn.extra_dejson.get("use_beeline", False) self.auth = auth self.conn = conn - self.run_as = run_as self.sub_process: Any = None if mapred_queue_priority: mapred_queue_priority = mapred_queue_priority.upper() @@ -119,20 +117,38 @@ def __init__( self.mapred_job_name = mapred_job_name self.proxy_user = proxy_user + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Returns connection widgets to add to connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import BooleanField, StringField + + return { + "use_beeline": BooleanField(lazy_gettext("Use Beeline"), default=False), + "proxy_user": StringField(lazy_gettext("Proxy User"), widget=BS3TextFieldWidget(), default=""), + "principal": StringField( + lazy_gettext("Principal"), widget=BS3TextFieldWidget(), default="hive/_HOST@EXAMPLE.COM" + ), + } + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Returns custom field behaviour.""" + return { + "hidden_fields": ["extra"], + "relabeling": {}, + } + def _get_proxy_user(self) -> str: """Set the proper proxy_user value in case the user overwrite the default.""" conn = self.conn - - proxy_user_value: str = conn.extra_dejson.get("proxy_user", "") - if proxy_user_value == "login" and conn.login: - return f"hive.server2.proxy.user={conn.login}" - if proxy_user_value == "owner" and self.run_as: - return f"hive.server2.proxy.user={self.run_as}" - if proxy_user_value == "as_param" and self.proxy_user: + if self.proxy_user is not None: return f"hive.server2.proxy.user={self.proxy_user}" - if proxy_user_value != "": # There is a custom proxy user + proxy_user_value: str = conn.extra_dejson.get("proxy_user", "") + if proxy_user_value != "": return f"hive.server2.proxy.user={proxy_user_value}" - return proxy_user_value # The default proxy user (undefined) + return "" def _prepare_cli_cmd(self) -> list[Any]: """Create the command list from available information.""" diff --git a/airflow/providers/apache/hive/operators/hive.py b/airflow/providers/apache/hive/operators/hive.py index 8c7a7cf1e1d2b..398cadce0d829 100644 --- a/airflow/providers/apache/hive/operators/hive.py +++ b/airflow/providers/apache/hive/operators/hive.py @@ -54,7 +54,6 @@ class HiveOperator(BaseOperator): object documentation for more details. :param script_begin_tag: If defined, the operator will get rid of the part of the script before the first occurrence of `script_begin_tag` - :param run_as_owner: Run HQL code as a DAG's owner. :param mapred_queue: queue used by the Hadoop CapacityScheduler. (templated) :param mapred_queue_priority: priority within CapacityScheduler queue. Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW @@ -91,7 +90,6 @@ def __init__( hiveconfs: dict[Any, Any] | None = None, hiveconf_jinja_translate: bool = False, script_begin_tag: str | None = None, - run_as_owner: bool = False, mapred_queue: str | None = None, mapred_queue_priority: str | None = None, mapred_job_name: str | None = None, @@ -107,9 +105,6 @@ def __init__( self.hiveconfs = hiveconfs or {} self.hiveconf_jinja_translate = hiveconf_jinja_translate self.script_begin_tag = script_begin_tag - self.run_as = None - if run_as_owner: - self.run_as = self.dag.owner self.mapred_queue = mapred_queue self.mapred_queue_priority = mapred_queue_priority self.mapred_job_name = mapred_job_name @@ -128,7 +123,6 @@ def hook(self) -> HiveCliHook: """Get Hive cli hook.""" return HiveCliHook( hive_cli_conn_id=self.hive_cli_conn_id, - run_as=self.run_as, mapred_queue=self.mapred_queue, mapred_queue_priority=self.mapred_queue_priority, mapred_job_name=self.mapred_job_name, diff --git a/airflow/providers/apache/hive/provider.yaml b/airflow/providers/apache/hive/provider.yaml index 12dcf4f3edecd..ea24c2b37027c 100644 --- a/airflow/providers/apache/hive/provider.yaml +++ b/airflow/providers/apache/hive/provider.yaml @@ -24,6 +24,7 @@ description: | state: ready source-date-epoch: 1705911912 versions: + - 7.0.0 - 6.4.2 - 6.4.1 - 6.4.0 diff --git a/docs/apache-airflow-providers-apache-hive/connections/hive_cli.rst b/docs/apache-airflow-providers-apache-hive/connections/hive_cli.rst index 2e23ec63d5ad0..cc52f1db92be2 100644 --- a/docs/apache-airflow-providers-apache-hive/connections/hive_cli.rst +++ b/docs/apache-airflow-providers-apache-hive/connections/hive_cli.rst @@ -64,19 +64,14 @@ Schema (optional) Specify your JDBC Hive database that you want to connect to with Beeline or specify a schema for an HQL statement to run with the Hive CLI. -Extra (optional) - Specify the extra parameters (as json dictionary) that can be used in Hive CLI connection. - The following parameters are all optional: - - * ``use_beeline`` - Specify as ``True`` if using the Beeline CLI. Default is ``False``. - * ``proxy_user`` - Specify a proxy user as an ``owner`` or ``login`` or ``as_param`` keep blank if using a - custom proxy user. - When using ``owner`` you will want to pass the operator ``run_as_owner=True`` if you don't you will run the hql as user="owner" - When using ``as_param`` you will want to pass the operator ``proxy_user=`` if you don't you will run the hql as user="as_param" - * ``principal`` - Specify the JDBC Hive principal to be used with Hive Beeline. +Use Beeline (optional) + Specify as ``True`` if using the Beeline CLI. Default is ``False``. + +Proxy User (optional) + Specify a proxy user to run HQL code as this user. + +Principal (optional) + Specify the JDBC Hive principal to be used with Hive Beeline. When specifying the connection in environment variable you should specify diff --git a/tests/providers/apache/hive/hooks/test_hive.py b/tests/providers/apache/hive/hooks/test_hive.py index 461b101641aa8..a137364767ffa 100644 --- a/tests/providers/apache/hive/hooks/test_hive.py +++ b/tests/providers/apache/hive/hooks/test_hive.py @@ -880,20 +880,12 @@ def setup_method(self): self.nondefault_schema = "nondefault" @pytest.mark.parametrize( - "extra_dejson, correct_proxy_user, run_as, proxy_user", + "extra_dejson, correct_proxy_user, proxy_user", [ - ({"proxy_user": "a_user_proxy"}, "hive.server2.proxy.user=a_user_proxy", None, None), - ({"proxy_user": "owner"}, "hive.server2.proxy.user=dummy_dag_owner", "dummy_dag_owner", None), - ({"proxy_user": "login"}, "hive.server2.proxy.user=admin", None, None), - ( - {"proxy_user": "as_param"}, - "hive.server2.proxy.user=param_proxy_user", - None, - "param_proxy_user", - ), + ({"proxy_user": "a_user_proxy"}, "hive.server2.proxy.user=a_user_proxy", None), ], ) - def test_get_proxy_user_value(self, extra_dejson, correct_proxy_user, run_as, proxy_user): + def test_get_proxy_user_value(self, extra_dejson, correct_proxy_user, proxy_user): hook = MockHiveCliHook() returner = mock.MagicMock() returner.extra_dejson = extra_dejson @@ -901,7 +893,6 @@ def test_get_proxy_user_value(self, extra_dejson, correct_proxy_user, run_as, pr hook.use_beeline = True hook.conn = returner hook.proxy_user = proxy_user - hook.run_as = run_as # Run result = hook._prepare_cli_cmd()