Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/check response #51

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions src/sxapi/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,34 @@

import requests

from sxapi.errors import (
SxapiAuthorizationError,
SxapiUnprocessableContentError,
)


class ApiTypes(Enum):
PUBLIC = 1
INTEGRATION = 2


def check_response(func):
"""Decorator to handle response status codes."""

def wrapper(*args, **kwargs):
response = func(*args, **kwargs)
if response.status_code in [401, 403]:
raise SxapiAuthorizationError(status_code=response.status_code)
elif response.status_code == 422:
raise SxapiUnprocessableContentError()
else:
response.raise_for_status()

return response.json()

return wrapper


class BaseAPI(object):
def __init__(
self,
Expand Down Expand Up @@ -69,22 +91,26 @@ def _login(self):
raise requests.HTTPError(response.status_code, response.reason)
self._session.headers.update({"Authorization": f"Bearer {self.api_token}"})

@check_response
def get(self, path, *args, **kwargs):
url = self.to_url(path)
r = self.session.get(url, *args, **kwargs)
return r.json()
return r

@check_response
def post(self, path, *args, **kwargs):
url = self.to_url(path)
r = self.session.post(url, *args, allow_redirects=False, **kwargs)
return r.json()
return r

@check_response
def put(self, path, *args, **kwargs):
url = self.to_url(path)
r = self.session.put(url, *args, allow_redirects=False, **kwargs)
return r.json()
return r

@check_response
def delete(self, path, *args, **kwargs):
url = self.to_url(path)
r = self.session.delete(url, *args, **kwargs)
return r.json()
return r
70 changes: 49 additions & 21 deletions src/sxapi/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,45 @@
expanduser,
)

from requests.exceptions import HTTPError
from setuptools import setup

from sxapi.cli import cli_user
from sxapi.cli.subparser.token import create_token_parser
from sxapi.errors import (
SxapiAuthorizationError,
SxapiConfigurationFileError,
SxapiUnprocessableContentError,
)


def handle_cli_return_values(func):
def wrapper(*args, **kwargs):

try:
return func(*args, **kwargs)
except SxapiAuthorizationError as e:
error_msg = e
exit_code = 1
except SxapiUnprocessableContentError as e:
error_msg = e
exit_code = 2
except SxapiConfigurationFileError as e:
error_msg = e
exit_code = 3
except HTTPError as e:
error_msg = e
exit_code = 98
except Exception as e:
error_msg = e
exit_code = 99

if error_msg:
print(error_msg)

exit(exit_code)

return wrapper


class Cli:
Expand Down Expand Up @@ -49,10 +84,13 @@ def read_config_from_file(self, config_file_path):
config_file = abspath(config_file)
parsable_files.append(config_file)

try:
config = configparser.ConfigParser(interpolation=None)
config.read(parsable_files)
config = configparser.ConfigParser(interpolation=None)

# if no configfile was read return empty config_dict
if len(config.read(parsable_files)) == 0:
return config_dict

try:
config_dict["user"] = config.get("SXAPI", "USER")
config_dict["pwd"] = config.get("SXAPI", "PASSWORD")
config_dict["orga"] = config.get("SXAPI", "ORGA")
Expand All @@ -62,15 +100,8 @@ def read_config_from_file(self, config_file_path):
config_dict["api_integration_v2_path"] = config.get(
"SXAPI", "API_INTEGRATION_V2_PATH"
)
except (
KeyError,
configparser.NoSectionError,
configparser.MissingSectionHeaderError,
) as e:
if config_file_path:
print(f"Error while reading config file: {e}")
return
# we should raise custom exception here
except configparser.Error as e:
raise SxapiConfigurationFileError(e)

return config_dict

Expand All @@ -87,16 +118,9 @@ def api_status():
pub_resp = cli_user.public_v2_api.get("/service/status")
int_resp = cli_user.integration_v2_api.get("/service/status")

exit_code = 0

if not (pub_resp["result"] == "ok" and int_resp["result"] == "ok"):
exit_code = 1

print(f"PublicV2 status: {pub_resp['result']}")
print(f"IntegrationV2 status: {int_resp['result']}")

exit(exit_code)

@staticmethod
def version_info():
"""Print version info."""
Expand Down Expand Up @@ -162,11 +186,12 @@ def parse_args(args):

return main_parser.parse_args(args)

@handle_cli_return_values
def run(self):
"""Call sxapi functions based on passed arguments."""
args = self.parse_args(sys.argv[1:])
if not args:
return 0
return

if args.print_configfile:
with open("./src/sxapi/cli/example-config.conf", "r") as f:
Expand All @@ -186,7 +211,10 @@ def run(self):
self.version_info()

if args.use_keyring and args.access_token:
print("Choose either -k (keyring), -t (argument) or no flag (environment)!")
print(
"Choose either -k (keyring), -t (argument) or"
" no flag (environment/config)!"
)
return

# run set_defaults for subparser
Expand Down
2 changes: 2 additions & 0 deletions src/sxapi/cli/cli_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def init_user(self, config_dict, args_token, args_keyring):
self.api_access_token = args_token
elif args_keyring:
self.api_access_token = self.get_token_keyring()
if self.api_access_token is None:
print("No token found in keyring. Use values from config file.\n")
else:
self.api_access_token = self.get_token_environment()

Expand Down
4 changes: 2 additions & 2 deletions src/sxapi/cli/example-config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ PASSWORD = "userSecret"
ORGA = smaxtec_organisation_id

# API url for PublicV2
API_PUBLIC_V2_PATH = https://api.smaxtec.com/v2/public
API_PUBLIC_V2_PATH = https://api.smaxtec.com/api/v2

# API url for IntegrationV2
API_INTEGRATION_V2_PATH = https://api.smaxtec.com/v2/integration
API_INTEGRATION_V2_PATH = https://api.smaxtec.com/integration/v2
51 changes: 51 additions & 0 deletions src/sxapi/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
class SxapiAuthorizationError(Exception):
"""Raised when authorization fails 401."""

AUTHORIZATION_ERROR_MSG = """
Requested object could not be accessed.
Maybe the organisation_id or the _id of the requested object is wrong.
"""

def __init__(self, message=AUTHORIZATION_ERROR_MSG, status_code=None):
self.message = message
self.status_code = status_code

def __str__(self):
text = self.message
if self.status_code:
text = text + "status code: " + str(self.status_code)

return text


class SxapiUnprocessableContentError(Exception):
"""Raised when content is unprocessable 422."""

UNPROCESSABLE_CONTENT_ERROR_MSG = """
The request was well-formed but was unable to be followed due to semantic errors.
"""

def __init__(self, message=UNPROCESSABLE_CONTENT_ERROR_MSG, status_code=None):
self.message = message
self.status_code = status_code

def __str__(self):
text = self.message
if self.status_code:
text = text + "status code: " + str(self.status_code)

return text


class SxapiConfigurationFileError(Exception):
"""Raised when configuration file is not valid."""

CONFIGURATION_FILE_ERROR_MSG = "Configuration File Error"

def __init__(self, parent, info=CONFIGURATION_FILE_ERROR_MSG):
self.parent_name = parent.__class__.__name__
self.message = parent.message
self.info = info

def __str__(self):
return self.info + " -> " + self.parent_name + " -> " + self.message
7 changes: 5 additions & 2 deletions tests/cli_tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
from sxapi.cli import cli_user
from sxapi.cli.cli import Cli

obj = mock.MagicMock

cli = Cli()
cli.config_file_paths = []


@mock.patch("sxapi.cli.cli.Cli.version_info")
Expand All @@ -23,8 +26,8 @@ def test_func(get_token_mock, keyring_mock, print_mock, version_mock):
cli.run()
call_args = print_mock.call_args_list[0]
assert (
call_args.args[0]
== "Choose either -k (keyring), -t (argument) or no flag (environment)!"
call_args.args[0] == "Choose either -k (keyring), -t (argument)"
" or no flag (environment/config)!"
)
print_mock.reset_mock()

Expand Down