Skip to content

Commit

Permalink
Change httpx to requests in file_task_handler (#39799)
Browse files Browse the repository at this point in the history
* Change httpx to requests in file_task_handler

- httpx does not support CIDRs in NO_PROXY
- simply, convert httpx to requests, issues done
- related issue: #39794

* Add cidr no_proxy test test_log_handlers.py

* Apply monkeypatch fixture

---------

Co-authored-by: scott-py <scott.py@kakaocorp.com>
  • Loading branch information
softyoungha and scott-py authored Jun 15, 2024
1 parent 8eebe2b commit 1ddadf5
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
8 changes: 4 additions & 4 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _set_task_deferred_context_var():

def _fetch_logs_from_service(url, log_relative_path):
# Import occurs in function scope for perf. Ref: https://github.com/apache/airflow/pull/21438
import httpx
import requests

from airflow.utils.jwt_signer import JWTSigner

Expand All @@ -96,7 +96,7 @@ def _fetch_logs_from_service(url, log_relative_path):
expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30),
audience="task-instance-logs",
)
response = httpx.get(
response = requests.get(
url,
timeout=timeout,
headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
Expand Down Expand Up @@ -574,9 +574,9 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], li
messages.append(f"Found logs served from host {url}")
logs.append(response.text)
except Exception as e:
from httpx import UnsupportedProtocol
from requests.exceptions import InvalidSchema

if isinstance(e, UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True:
if isinstance(e, InvalidSchema) and ti.task.inherits_from_empty_operator is True:
messages.append(self.inherits_from_empty_operator_log_message)
else:
messages.append(f"Could not read served logs: {e}")
Expand Down
46 changes: 46 additions & 0 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging.config
import os
import re
from http import HTTPStatus
from importlib import reload
from pathlib import Path
from unittest import mock
Expand All @@ -29,6 +30,7 @@
import pendulum
import pytest
from kubernetes.client import models as k8s
from requests.adapters import Response

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.exceptions import RemovedInAirflow3Warning
Expand All @@ -43,6 +45,7 @@
from airflow.utils.log.file_task_handler import (
FileTaskHandler,
LogType,
_fetch_logs_from_service,
_interleave_logs,
_parse_timestamps_in_log_file,
)
Expand Down Expand Up @@ -779,3 +782,46 @@ def test_permissions_for_new_directories(tmp_path):
assert base_dir.stat().st_mode % 0o1000 == default_permissions
finally:
os.umask(old_umask)


worker_url = "http://10.240.5.168:8793"
log_location = "dag_id=sample/run_id=manual__2024-05-23T07:18:59.298882+00:00/task_id=sourcing/attempt=1.log"
log_url = f"{worker_url}/log/{log_location}"


@mock.patch("requests.adapters.HTTPAdapter.send")
def test_fetch_logs_from_service_with_not_matched_no_proxy(mock_send, monkeypatch):
monkeypatch.setenv("http_proxy", "http://proxy.example.com")
monkeypatch.setenv("no_proxy", "localhost")

response = Response()
response.status_code = HTTPStatus.OK
mock_send.return_value = response

_fetch_logs_from_service(log_url, log_location)

mock_send.assert_called()
_, kwargs = mock_send.call_args
assert "proxies" in kwargs
proxies = kwargs["proxies"]
assert "http" in proxies.keys()
assert "no" in proxies.keys()


@mock.patch("requests.adapters.HTTPAdapter.send")
def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, monkeypatch):
monkeypatch.setenv("http_proxy", "http://proxy.example.com")
monkeypatch.setenv("no_proxy", "10.0.0.0/8")

response = Response()
response.status_code = HTTPStatus.OK
mock_send.return_value = response

_fetch_logs_from_service(log_url, log_location)

mock_send.assert_called()
_, kwargs = mock_send.call_args
assert "proxies" in kwargs
proxies = kwargs["proxies"]
assert "http" not in proxies.keys()
assert "no" not in proxies.keys()

0 comments on commit 1ddadf5

Please sign in to comment.