Skip to content

Commit

Permalink
add signature and error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
maxkahan committed Mar 16, 2024
1 parent 38843b4 commit 7d6eafd
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 107 deletions.
54 changes: 51 additions & 3 deletions http_client/src/vonage_http_client/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from base64 import b64encode
from typing import Optional
from typing import Literal, Optional
import hashlib
import hmac
from time import time

from pydantic import validate_call
from vonage_jwt.jwt import JwtClient
Expand All @@ -10,11 +13,17 @@
class Auth:
"""Deals with Vonage API authentication.
Some Vonage APIs require an API key and secret for authentication. Others require an application ID and JWT.
It is also possible to use a message signature with the SMS API.
Args:
- api_key (str): The API key for authentication.
- api_secret (str): The API secret for authentication.
- application_id (str): The application ID for JWT authentication.
- private_key (str): The private key for JWT authentication.
- signature_secret (str): The signature secret for authentication.
- signature_method (str): The signature method for authentication.
This should be one of `md5`, `sha1`, `sha256`, or `sha512` if using HMAC digests. If you want to use a simple MD5 hash, leave this as `None`.
Note:
To use JWT authentication, provide values for both `application_id` and `private_key`.
Expand All @@ -27,8 +36,8 @@ def __init__(
api_secret: Optional[str] = None,
application_id: Optional[str] = None,
private_key: Optional[str] = None,
signature: Optional[str] = None,
signature_method: Optional[str] = None,
signature_secret: Optional[str] = None,
signature_method: Optional[Literal['md5', 'sha1', 'sha256', 'sha512']] = None,
) -> None:
self._validate_input_combinations(
api_key, api_secret, application_id, private_key
Expand All @@ -40,6 +49,9 @@ def __init__(
if application_id is not None and private_key is not None:
self._jwt_client = JwtClient(application_id, private_key)

self._signature_secret = signature_secret
self._signature_method = getattr(hashlib, signature_method)

@property
def api_key(self):
return self._api_key
Expand All @@ -65,6 +77,42 @@ def create_basic_auth_string(self):
)
return f'Basic {hash}'

def sign_params(self, params: dict) -> dict:
"""
Signs the provided message parameters using the signature secret provided to the `Auth` class.
If no signature secret is provided, the message parameters are signed using a simple MD5 hash.
Args:
params (dict): The message parameters to be signed.
Returns:
dict: The signed message parameters.
"""

if self._signature_method:
hasher = hmac.new(
self._signature_secret.encode(),
digestmod=self._signature_method,
)
else:
hasher = hashlib.md5()

if not params.get("timestamp"):
params["timestamp"] = int(time())

for key in sorted(params):
value = params[key]

if isinstance(value, str):
value = value.replace("&", "_").replace("=", "_")

hasher.update(f"&{key}={value}".encode("utf-8"))

if self._signature_method is None:
hasher.update(self._signature_secret.encode())

return hasher.hexdigest()

def _validate_input_combinations(
self, api_key, api_secret, application_id, private_key
):
Expand Down
16 changes: 13 additions & 3 deletions http_client/src/vonage_http_client/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def post(
request_path: str = '',
params: dict = None,
auth_type: str = 'jwt',
):
) -> Union[dict, None]:
return self.make_request('POST', host, request_path, params, auth_type)

def get(
Expand All @@ -114,7 +114,7 @@ def get(
request_path: str = '',
params: dict = None,
auth_type: str = 'jwt',
):
) -> Union[dict, None]:
return self.make_request('GET', host, request_path, params, auth_type)

@validate_call
Expand All @@ -124,7 +124,7 @@ def make_request(
host: str,
request_path: str = '',
params: Optional[dict] = None,
auth_type: Literal['jwt', 'basic'] = 'jwt',
auth_type: Literal['jwt', 'basic', 'signature'] = 'jwt',
):
url = f'https://{host}{request_path}'
logger.debug(
Expand All @@ -134,6 +134,16 @@ def make_request(
self._headers['Authorization'] = self._auth.create_jwt_auth_string()
elif auth_type == 'basic':
self._headers['Authorization'] = self._auth.create_basic_auth_string()
elif auth_type == 'signature':
params = self._auth.sign_params(params)
with self._session.request(
request_type,
url,
params=params,
headers=self._headers,
timeout=self._timeout,
) as response:
return self._parse_response(response)

with self._session.request(
request_type,
Expand Down
54 changes: 54 additions & 0 deletions http_client/tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from os.path import dirname, join
from unittest.mock import patch
import hashlib


from pydantic import ValidationError
from pytest import raises
Expand Down Expand Up @@ -102,3 +104,55 @@ def test_create_jwt_error_no_application_id_or_private_key():
def test_create_basic_auth_string():
auth = Auth(api_key=api_key, api_secret=api_secret)
assert auth.create_basic_auth_string() == 'Basic cXdlcmFzZGY6MTIzNHF3ZXJhc2Rmenhjdg=='


def test_auth_init_with_valid_combinations():
api_key = 'qwerasdf'
api_secret = '1234qwerasdfzxcv'
application_id = 'asdfzxcv'
private_key = 'dummy_private_key'
signature_secret = 'signature_secret'
signature_method = 'sha256'

auth = Auth(
api_key=api_key,
api_secret=api_secret,
application_id=application_id,
private_key=private_key,
signature_secret=signature_secret,
signature_method=signature_method,
)

assert auth._api_key == api_key
assert auth._api_secret == api_secret
assert auth._jwt_client.application_id == application_id
assert auth._jwt_client.private_key == private_key
assert auth._signature_secret == signature_secret
assert auth._signature_method == hashlib.sha256


def test_auth_init_with_invalid_combinations():
api_key = 'qwerasdf'
api_secret = '1234qwerasdfzxcv'
application_id = 'asdfzxcv'
private_key = 'dummy_private_key'
signature_secret = 'signature_secret'
signature_method = 'invalid_method'

with patch('vonage_http_client.auth.hashlib') as mock_hashlib:
mock_hashlib.sha256.side_effect = AttributeError

auth = Auth(
api_key=api_key,
api_secret=api_secret,
application_id=application_id,
private_key=private_key,
signature_secret=signature_secret,
signature_method=signature_method,
)

assert auth._api_key == api_key
assert auth._api_secret == api_secret
assert auth._jwt_client is None
assert auth._signature_secret == signature_secret
assert auth._signature_method is None
12 changes: 12 additions & 0 deletions sms/src/vonage_sms/errors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
from requests import Response
from vonage_utils.errors import VonageError


class SmsError(VonageError):
"""Indicates an error with the Vonage SMS Package."""


class PartialFailureError(SmsError):
"""Indicates that a request was partially successful."""

def __init__(self, response: Response):
self.message = (
'Sms.send_message method partially failed. Not all of the message(s) sent successfully.',
)
super().__init__(self.message)
self.response = response
95 changes: 74 additions & 21 deletions sms/src/vonage_sms/sms.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,104 @@
from copy import deepcopy
from dataclasses import dataclass
from typing import List, Literal, Optional, Union
from typing import List, Literal, Optional

from pydantic import BaseModel, Field, field_validator, validate_call
from vonage_http_client.http_client import HttpClient
from vonage_sms.errors import SmsError

from .errors import SmsError, PartialFailureError


class SmsMessage(BaseModel):
to: str
from_: str = Field(..., alias="from")
text: str
type: Optional[str] = None
sig: Optional[str] = Field(None, min_length=16, max_length=60)
status_report_req: Optional[int] = Field(
None,
alias="status-report-req",
description="Set to 1 to receive a Delivery Receipt",
)
client_ref: Optional[str] = Field(
None, alias="client-ref", description="Your own reference. Up to 40 characters."
)
network_code: Optional[str] = Field(
None,
alias="network-code",
description="A 4-5 digit number that represents the mobile carrier network code",
)
client_ref: Optional[str] = Field(None, alias="client-ref", max_length=100)
type: Optional[Literal['text', 'binary', 'unicode']] = None
ttl: Optional[int] = Field(None, ge=20000, le=604800000)
status_report_req: Optional[bool] = Field(None, alias='status-report-req')
callback: Optional[str] = Field(None, max_length=100)
message_class: Optional[int] = Field(None, alias='message-class', ge=0, le=3)
body: Optional[str] = None
udh: Optional[str] = None
protocol_id: Optional[int] = Field(None, alias='protocol-id', ge=0, le=255)
account_ref: Optional[str] = Field(None, alias='account-ref')
entity_id: Optional[str] = Field(None, alias='entity-id')
content_id: Optional[str] = Field(None, alias='content-id')

@field_validator('body', 'udh')
@classmethod
def validate_body(cls, value, values):
if 'type' not in values or not values['type'] == 'binary':
raise ValueError(
'This parameter can only be set when the "type" parameter is set to "binary".'
)
if values['type'] == 'binary' and not value:
raise ValueError('This parameter is required for binary messages.')


@dataclass
class MessageResponse:
to: str
message_id: str
status: str
remaining_balance: str
message_price: str
network: str
client_ref: Optional[str] = None
account_ref: Optional[str] = None


@dataclass
class SmsResponse:
id: str
message_count: str
messages: List[MessageResponse]


class Sms:
"""Calls Vonage's SMS API."""

def __init__(self, http_client: HttpClient) -> None:
self._http_client = deepcopy(http_client)
self._auth_type = 'basic'
if self._http_client._auth._signature_secret:
self._auth_type = 'signature'
else:
self._auth_type = 'basic'

@validate_call
def send(self, message: SmsMessage) -> SmsResponse:
"""Send an SMS message."""
response = self._http_client.post(
self._http_client.api_host,
'/v2/ni',
message.model_dump(),
self._http_client.rest_host,
'/sms/json',
message.model_dump(by_alias=True),
self._auth_type,
)

if int(response['message-count']) > 1:
self.check_for_partial_failure(response)
else:
self.check_for_error(response)

messages = []
for message in response['messages']:
messages.append(MessageResponse(**message))

return SmsResponse(message_count=response['message-count'], messages=messages)

def check_for_partial_failure(self, response_data):
successful_messages = 0
total_messages = int(response_data['message-count'])

for message in response_data['messages']:
if message['status'] == '0':
successful_messages += 1
if successful_messages < total_messages:
raise PartialFailureError(response_data)

def check_for_error(self, response_data):
message = response_data['messages'][0]
if int(message['status']) != 0:
raise SmsError(
f'Sms.send_message method failed with error code {message["status"]}: {message["error-text"]}'
)
Loading

0 comments on commit 7d6eafd

Please sign in to comment.