diff --git a/requests/__version__.py b/requests/__version__.py index dc33eef651..60847245ff 100644 --- a/requests/__version__.py +++ b/requests/__version__.py @@ -5,7 +5,7 @@ __title__ = 'requests' __description__ = 'Python HTTP for Humans.' __url__ = 'http://python-requests.org' -__version__ = '2.18.4' +__version__ = '2.18.51' __build__ = 0x021804 __author__ = 'Kenneth Reitz' __author_email__ = 'me@kennethreitz.org' diff --git a/requests/api.py b/requests/api.py index bc2115c150..415066402a 100644 --- a/requests/api.py +++ b/requests/api.py @@ -11,6 +11,7 @@ """ from . import sessions +from .circuit_breaker import default_circuit_breaker def request(method, url, **kwargs): @@ -54,6 +55,15 @@ def request(method, url, **kwargs): # By using the 'with' statement we are sure the session is closed, thus we # avoid leaving sockets open which can trigger a ResourceWarning in some # cases, and look like a memory leak in others. + + executed, response = default_circuit_breaker.execute_with_circuit_breaker(basic_request, method, url, **kwargs) + if executed: + return response + + return basic_request(method, url, **kwargs) + + +def basic_request(method, url, **kwargs): with sessions.Session() as session: return session.request(method=method, url=url, **kwargs) diff --git a/requests/circuit_breaker.py b/requests/circuit_breaker.py new file mode 100644 index 0000000000..b75d47e93f --- /dev/null +++ b/requests/circuit_breaker.py @@ -0,0 +1,179 @@ +import pybreaker +import os +import json +import logging +from urllib3.util.url import get_host +from .exceptions import ApiCircuitBreakerError, CustomHttpCircuitBreakerError +import newrelic.agent +import socket + + +class MonitorListener(pybreaker.CircuitBreakerListener): + + def __init__(self): + self.event_name = "circuit_breaker_event_espresso_{}".format(os.getenv('ENV')) + self.ip = None + self.app_name = None + try: + self.app_name = newrelic.core.config.global_settings().app_name + self.ip = socket.gethostbyname(socket.gethostname()) + except: + logging.exception("error init MonitorListener event_name: {}".format(self.event_name)) + pass + + def failure(self, cb, exc): + self.send_updates(cb, 0, 1) + + def success(self, cb): + self.send_updates(cb, 1, 0) + + def state_change(self, cb, old_state, new_state): + self.send_updates(cb, 0, 0) + + def send_updates(self, cb, success_count, fail_count): + try: + newrelic.agent.record_custom_event(self.event_name, { + + "name": cb.name, + "service_name": self.app_name, + "instance_ip": self.ip, + "circuit_state": cb.current_state, + "success": success_count, + "errors": fail_count, + "fallback_success": 0, + "fallback_failure": 0, + }, newrelic.agent.application()) + except: + logging.exception("error send_updates for event_name: {}".format(self.event_name)) + pass + + +class CircuitBreakerConfig(object): + + def __init__(self, fail_max_to_open, sleep_time_to_half_open, http_failed_status_code_list, + http_method_keyword_params): + self.fail_max_to_open = fail_max_to_open + self.sleep_time_to_half_open = sleep_time_to_half_open + self.http_failed_status_code_list = http_failed_status_code_list or [] + self.http_method_keyword_params = http_method_keyword_params or [] + + @staticmethod + def from_json(json_data): + configs = {} + try: + for config in json_data: + try: + if config["domain_name"] in configs: + logging.error( + "Config already present once overriding :" + config["domain_name"]) + http_method_keyword_params = config.get("http_method_keyword_params") or [] + http_method_keyword_params = list(filter(lambda x: (x.get('keyword') and x.get('method')), + http_method_keyword_params)) + configs[config["domain_name"]] = CircuitBreakerConfig(config["fail_max_to_open"], + config["sleep_time_to_half_open"], + config["http_failed_status_code_list"], + http_method_keyword_params) + except: + logging.exception("JSON File has wrong format circuit breaker functionality wont be used :" + config) + except: + logging.exception("JSON File has wrong format circuit breaker functionality wont be used : JSON_PARSE_ERROR") + return configs + + +class CircuitBreaker(object): + + def __init__(self): + self.metric_collector = MonitorListener() + self.__circuit_breaker_factory_per_domain = {} + self.__circuit_breaker_config_per_domain = {} + self.__load_from_json_file() + self.__register_circuit_breaker() + + def __load_from_json_file(self): + + json_file_path = os.environ.get("CB_JSON_FILE_PATH") or None + if not json_file_path: + logging.exception("JSON File path not found circuit breaker functionality wont be used : JSON_FILE_PATH") + try: + with open(json_file_path, "r") as f: + data = json.load(f) + self.__circuit_breaker_config_per_domain = CircuitBreakerConfig.from_json(data) + except: + logging.exception("JSON File has wrong format circuit breaker functionality wont be used : JSON_FILE_PATH") + + def __update_cb_factory(self, key, config): + self.__circuit_breaker_factory_per_domain[key] = pybreaker.CircuitBreaker( + fail_max=config.fail_max_to_open, + reset_timeout=config.sleep_time_to_half_open, + state_storage=pybreaker.CircuitMemoryStorage(pybreaker.STATE_CLOSED), name=key, + listeners=[self.metric_collector] + ) + + def __register_circuit_breaker(self): + + try: + for key, config in self.__circuit_breaker_config_per_domain.iteritems(): + + if not config.http_method_keyword_params: + self.__update_cb_factory(key, config) + else: + for param in config.http_method_keyword_params: + k = self.__get_domain_key(key, param) + self.__update_cb_factory(k, config) + except: + logging.exception("error registering circuit breaker") + pass + + @staticmethod + def __get_domain_key(domain_name, param): + return "{}_{}_{}".format(domain_name, param["keyword"], param["method"]) + + def __get_circuit_breaker_by_url(self, url, method): + try: + _, domain_name, port = get_host(url) + if port not in [80, 443, None]: + domain_name = "{}:{}".format(domain_name, port) + cfg = self.__circuit_breaker_config_per_domain.get(domain_name) + + if cfg is None: + return None, None + if not cfg.http_method_keyword_params: + return self.__circuit_breaker_factory_per_domain.get(domain_name), cfg.http_failed_status_code_list + + for param in cfg.http_method_keyword_params: + if param["keyword"] in url and param["method"] == method: + cb = self.__circuit_breaker_config_per_domain.get(CircuitBreaker.__get_domain_key(domain_name, param)) + if cb: + return cb, cfg.http_failed_status_code_list + + except Exception as e: + logging.exception("error while getting url: {}".format(e.message)) + pass + + return None, None + + def execute_with_circuit_breaker(self, func, method, url, **kwargs): + + cb, status_code_list = self.__get_circuit_breaker_by_url(url, method) + if not cb: + # logging.exception("error execute_with_circuit_breaker cb not found: {}".format(url)) + return False, None + + try: + return True, cb.call(CircuitBreaker.basic_request_cb, status_code_list, func, method, url, **kwargs) + except pybreaker.CircuitBreakerError: + raise ApiCircuitBreakerError( + "Requests are closed because of too many failures".format(url) + ) + except CustomHttpCircuitBreakerError as e: + return True, e.http_response + + @staticmethod + def basic_request_cb(status_code_list, func, method, url, **kwargs): + response = func(method, url, **kwargs) + if response.status_code in status_code_list: + raise CustomHttpCircuitBreakerError(response) + return response + + +default_circuit_breaker = CircuitBreaker() diff --git a/requests/exceptions.py b/requests/exceptions.py index be7eaed6b9..7753f6beea 100644 --- a/requests/exceptions.py +++ b/requests/exceptions.py @@ -104,6 +104,17 @@ class RetryError(RequestException): class UnrewindableBodyError(RequestException): """Requests encountered an error when trying to rewind a body""" + +class ApiCircuitBreakerError(RequestException): + """Requests encountered as circuit was open """ + + +class CustomHttpCircuitBreakerError(Exception): + """Failure in http status codes """ + def __init__(self, http_response): + super(CustomHttpCircuitBreakerError, self).__init__() + self.http_response = http_response + # Warnings diff --git a/requirements.txt b/requirements.txt index 8d79283fd9..c218499e5c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,5 @@ docutils flake8 tox detox +pybreaker==0.6.0 +newrelic==4.2.0.100 diff --git a/setup.py b/setup.py index ed4892d41f..ce176533c4 100755 --- a/setup.py +++ b/setup.py @@ -45,7 +45,9 @@ def run_tests(self): 'chardet>=3.0.2,<3.1.0', 'idna>=2.5,<2.7', 'urllib3>=1.21.1,<1.23', - 'certifi>=2017.4.17' + 'certifi>=2017.4.17', + 'pybreaker==0.6.0', + 'newrelic==4.2.0.100' ] test_requirements = ['pytest-httpbin==0.0.7', 'pytest-cov', 'pytest-mock', 'pytest-xdist', 'PySocks>=1.5.6, !=1.5.7', 'pytest>=2.8.0']