Skip to content

Commit

Permalink
Merge pull request #2 from dunzoit/circuit_breaker
Browse files Browse the repository at this point in the history
Circuit breaker
  • Loading branch information
avinashdunzo authored Jun 2, 2021
2 parents a3d7cf3 + 2350002 commit a9331a8
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 2 deletions.
2 changes: 1 addition & 1 deletion requests/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
__title__ = 'requests'
__description__ = 'Python HTTP for Humans.'
__url__ = 'http://python-requests.org'
__version__ = '2.18.4'
__version__ = '2.18.51'
__build__ = 0x021804
__author__ = 'Kenneth Reitz'
__author_email__ = 'me@kennethreitz.org'
Expand Down
10 changes: 10 additions & 0 deletions requests/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""

from . import sessions
from .circuit_breaker import default_circuit_breaker


def request(method, url, **kwargs):
Expand Down Expand Up @@ -54,6 +55,15 @@ def request(method, url, **kwargs):
# By using the 'with' statement we are sure the session is closed, thus we
# avoid leaving sockets open which can trigger a ResourceWarning in some
# cases, and look like a memory leak in others.

executed, response = default_circuit_breaker.execute_with_circuit_breaker(basic_request, method, url, **kwargs)
if executed:
return response

return basic_request(method, url, **kwargs)


def basic_request(method, url, **kwargs):
with sessions.Session() as session:
return session.request(method=method, url=url, **kwargs)

Expand Down
179 changes: 179 additions & 0 deletions requests/circuit_breaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import pybreaker
import os
import json
import logging
from urllib3.util.url import get_host
from .exceptions import ApiCircuitBreakerError, CustomHttpCircuitBreakerError
import newrelic.agent
import socket


class MonitorListener(pybreaker.CircuitBreakerListener):

def __init__(self):
self.event_name = "circuit_breaker_event_espresso_{}".format(os.getenv('ENV'))
self.ip = None
self.app_name = None
try:
self.app_name = newrelic.core.config.global_settings().app_name
self.ip = socket.gethostbyname(socket.gethostname())
except:
logging.exception("error init MonitorListener event_name: {}".format(self.event_name))
pass

def failure(self, cb, exc):
self.send_updates(cb, 0, 1)

def success(self, cb):
self.send_updates(cb, 1, 0)

def state_change(self, cb, old_state, new_state):
self.send_updates(cb, 0, 0)

def send_updates(self, cb, success_count, fail_count):
try:
newrelic.agent.record_custom_event(self.event_name, {

"name": cb.name,
"service_name": self.app_name,
"instance_ip": self.ip,
"circuit_state": cb.current_state,
"success": success_count,
"errors": fail_count,
"fallback_success": 0,
"fallback_failure": 0,
}, newrelic.agent.application())
except:
logging.exception("error send_updates for event_name: {}".format(self.event_name))
pass


class CircuitBreakerConfig(object):

def __init__(self, fail_max_to_open, sleep_time_to_half_open, http_failed_status_code_list,
http_method_keyword_params):
self.fail_max_to_open = fail_max_to_open
self.sleep_time_to_half_open = sleep_time_to_half_open
self.http_failed_status_code_list = http_failed_status_code_list or []
self.http_method_keyword_params = http_method_keyword_params or []

@staticmethod
def from_json(json_data):
configs = {}
try:
for config in json_data:
try:
if config["domain_name"] in configs:
logging.error(
"Config already present once overriding :" + config["domain_name"])
http_method_keyword_params = config.get("http_method_keyword_params") or []
http_method_keyword_params = list(filter(lambda x: (x.get('keyword') and x.get('method')),
http_method_keyword_params))
configs[config["domain_name"]] = CircuitBreakerConfig(config["fail_max_to_open"],
config["sleep_time_to_half_open"],
config["http_failed_status_code_list"],
http_method_keyword_params)
except:
logging.exception("JSON File has wrong format circuit breaker functionality wont be used :" + config)
except:
logging.exception("JSON File has wrong format circuit breaker functionality wont be used : JSON_PARSE_ERROR")
return configs


class CircuitBreaker(object):

def __init__(self):
self.metric_collector = MonitorListener()
self.__circuit_breaker_factory_per_domain = {}
self.__circuit_breaker_config_per_domain = {}
self.__load_from_json_file()
self.__register_circuit_breaker()

def __load_from_json_file(self):

json_file_path = os.environ.get("CB_JSON_FILE_PATH") or None
if not json_file_path:
logging.exception("JSON File path not found circuit breaker functionality wont be used : JSON_FILE_PATH")
try:
with open(json_file_path, "r") as f:
data = json.load(f)
self.__circuit_breaker_config_per_domain = CircuitBreakerConfig.from_json(data)
except:
logging.exception("JSON File has wrong format circuit breaker functionality wont be used : JSON_FILE_PATH")

def __update_cb_factory(self, key, config):
self.__circuit_breaker_factory_per_domain[key] = pybreaker.CircuitBreaker(
fail_max=config.fail_max_to_open,
reset_timeout=config.sleep_time_to_half_open,
state_storage=pybreaker.CircuitMemoryStorage(pybreaker.STATE_CLOSED), name=key,
listeners=[self.metric_collector]
)

def __register_circuit_breaker(self):

try:
for key, config in self.__circuit_breaker_config_per_domain.iteritems():

if not config.http_method_keyword_params:
self.__update_cb_factory(key, config)
else:
for param in config.http_method_keyword_params:
k = self.__get_domain_key(key, param)
self.__update_cb_factory(k, config)
except:
logging.exception("error registering circuit breaker")
pass

@staticmethod
def __get_domain_key(domain_name, param):
return "{}_{}_{}".format(domain_name, param["keyword"], param["method"])

def __get_circuit_breaker_by_url(self, url, method):
try:
_, domain_name, port = get_host(url)
if port not in [80, 443, None]:
domain_name = "{}:{}".format(domain_name, port)
cfg = self.__circuit_breaker_config_per_domain.get(domain_name)

if cfg is None:
return None, None
if not cfg.http_method_keyword_params:
return self.__circuit_breaker_factory_per_domain.get(domain_name), cfg.http_failed_status_code_list

for param in cfg.http_method_keyword_params:
if param["keyword"] in url and param["method"] == method:
cb = self.__circuit_breaker_config_per_domain.get(CircuitBreaker.__get_domain_key(domain_name, param))
if cb:
return cb, cfg.http_failed_status_code_list

except Exception as e:
logging.exception("error while getting url: {}".format(e.message))
pass

return None, None

def execute_with_circuit_breaker(self, func, method, url, **kwargs):

cb, status_code_list = self.__get_circuit_breaker_by_url(url, method)
if not cb:
# logging.exception("error execute_with_circuit_breaker cb not found: {}".format(url))
return False, None

try:
return True, cb.call(CircuitBreaker.basic_request_cb, status_code_list, func, method, url, **kwargs)
except pybreaker.CircuitBreakerError:
raise ApiCircuitBreakerError(
"Requests are closed because of too many failures".format(url)
)
except CustomHttpCircuitBreakerError as e:
return True, e.http_response

@staticmethod
def basic_request_cb(status_code_list, func, method, url, **kwargs):
response = func(method, url, **kwargs)
if response.status_code in status_code_list:
raise CustomHttpCircuitBreakerError(response)
return response


default_circuit_breaker = CircuitBreaker()
11 changes: 11 additions & 0 deletions requests/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ class RetryError(RequestException):
class UnrewindableBodyError(RequestException):
"""Requests encountered an error when trying to rewind a body"""


class ApiCircuitBreakerError(RequestException):
"""Requests encountered as circuit was open """


class CustomHttpCircuitBreakerError(Exception):
"""Failure in http status codes """
def __init__(self, http_response):
super(CustomHttpCircuitBreakerError, self).__init__()
self.http_response = http_response

# Warnings


Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ docutils
flake8
tox
detox
pybreaker==0.6.0
newrelic==4.2.0.100
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def run_tests(self):
'chardet>=3.0.2,<3.1.0',
'idna>=2.5,<2.7',
'urllib3>=1.21.1,<1.23',
'certifi>=2017.4.17'
'certifi>=2017.4.17',
'pybreaker==0.6.0',
'newrelic==4.2.0.100'

]
test_requirements = ['pytest-httpbin==0.0.7', 'pytest-cov', 'pytest-mock', 'pytest-xdist', 'PySocks>=1.5.6, !=1.5.7', 'pytest>=2.8.0']
Expand Down

0 comments on commit a9331a8

Please sign in to comment.