From f7651bbb38d1e6a561a5b48511dc9bc2f89067af Mon Sep 17 00:00:00 2001 From: maxkahan Date: Tue, 14 May 2024 18:32:38 +0100 Subject: [PATCH] add sim swap api --- CHANGES.md | 3 ++ README.md | 17 ++++++++ src/vonage/camara_auth.py | 47 +++++++++++++++++++++ src/vonage/client.py | 10 ++++- src/vonage/errors.py | 8 ---- src/vonage/number_verification.py | 36 ---------------- src/vonage/sim_swap.py | 27 ++++++++---- tests/conftest.py | 14 +++++++ tests/data/camara_auth/oidc_request.json | 5 +++ tests/data/camara_auth/token_request.json | 5 +++ tests/data/sim_swap/check_sim_swap.json | 3 ++ tests/data/sim_swap/get_swap_date.json | 3 ++ tests/test_camara_auth.py | 43 +++++++++++++++++++ tests/test_sim_swap.py | 50 +++++++++++++++++++++++ 14 files changed, 218 insertions(+), 53 deletions(-) create mode 100644 src/vonage/camara_auth.py delete mode 100644 src/vonage/number_verification.py create mode 100644 tests/data/camara_auth/oidc_request.json create mode 100644 tests/data/camara_auth/token_request.json create mode 100644 tests/data/sim_swap/check_sim_swap.json create mode 100644 tests/data/sim_swap/get_swap_date.json create mode 100644 tests/test_camara_auth.py create mode 100644 tests/test_sim_swap.py diff --git a/CHANGES.md b/CHANGES.md index 8c2884f9..ee051383 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,6 @@ +# 3.15.0 +- Add support for the [Vonage Sim Swap API](https://developer.vonage.com/en/sim-swap/overview) + # 3.14.0 - Add publisher-only as a valid Video API client token role diff --git a/README.md b/README.md index 9c5d8684..33ef2776 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ need a Vonage account. Sign up [for free at vonage.com][signup]. - [Managing Secrets](#managing-secrets) - [Application API](#application-api) - [Users API](#users-api) +- [Sim Swap API](#sim-swap-api) - [Validating Webhook Signatures](#validate-webhook-signatures) - [JWT Parameters](#jwt-parameters) - [Overriding API Attributes](#overriding-api-attributes) @@ -1137,6 +1138,22 @@ client.users.update_user('USER_ID', params={...}) client.users.delete_user('USER_ID') ``` +## Sim Swap API + +This can be used to check the sim swap status of a device. You must register a business account with Vonage and create a network profile in order to use this API. [More information on authentication can be found in the Vonage Developer documentation]('https://developer.vonage.com/en/getting-started-network/authentication'). + +### Check the Sim Swap Status of a Number + +```python +client.sim_swap.check('447700900000', max_age=24) +``` + +### Retrieve the Last Sim Swap Date for a Number + +```python +client.sim_swap.get_last_swap_date('447700900000') +``` + ## Validate webhook signatures ```python diff --git a/src/vonage/camara_auth.py b/src/vonage/camara_auth.py new file mode 100644 index 00000000..2c7bfc57 --- /dev/null +++ b/src/vonage/camara_auth.py @@ -0,0 +1,47 @@ +from __future__ import annotations +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from vonage import Client + + +class CamaraAuth: + """Class containing methods for authenticating APIs following Camara standards.""" + + def __init__(self, client: Client): + self._client = client + self._auth_type = 'jwt' + + def make_oidc_request(self, number: str, scope: str): + """Make an OIDC request to authenticate a user. + + Returns a code that can be used to request a Camara token.""" + + login_hint = f'tel:+{number}' + params = {'login_hint': login_hint, 'scope': scope} + + return self._client.post( + 'api-eu.vonage.com', + '/oauth2/bc-authorize', + params=params, + auth_type=self._auth_type, + body_is_json=False, + ) + + def request_camara_token( + self, oidc_response: dict, grant_type: str = 'urn:openid:params:grant-type:ciba' + ): + """Request a Camara token using an authentication request ID given as a + response to the OIDC request. + """ + params = { + 'grant_type': grant_type, + 'auth_req_id': oidc_response['auth_req_id'], + } + token_response = self._client.post( + 'api-eu.vonage.com', + '/oauth2/token', + params=params, + auth_type=self._auth_type, + ) + return token_response['access_token'] diff --git a/src/vonage/client.py b/src/vonage/client.py index 992b0d22..1ddc88ec 100644 --- a/src/vonage/client.py +++ b/src/vonage/client.py @@ -8,7 +8,6 @@ from .messages import Messages from .number_insight import NumberInsight from .number_management import Numbers -from .number_verification import NumberVerification from .proactive_connect import ProactiveConnect from .redact import Redact from .sim_swap import SimSwap @@ -135,6 +134,7 @@ def __init__( self.numbers = Numbers(self) self.proactive_connect = ProactiveConnect(self) self.short_codes = ShortCodes(self) + self.sim_swap = SimSwap(self) self.sms = Sms(self) self.subaccounts = Subaccounts(self) self.users = Users(self) @@ -260,6 +260,7 @@ def post( auth_type=None, body_is_json=True, supports_signature_auth=False, + oauth_token=None, ): """ Low-level method to make a post request to an API server. @@ -285,9 +286,11 @@ def post( ) elif auth_type == 'header': self._request_headers['Authorization'] = self._create_header_auth_string() + elif auth_type == 'oauth2': + self._request_headers['Authorization'] = self._create_oauth2_auth_string(oauth_token) else: raise InvalidAuthenticationTypeError( - f'Invalid authentication type. Must be one of "jwt", "header" or "params".' + f'Invalid authentication type. Must be one of "jwt", "header", "params" or "oauth2".' ) logger.debug( @@ -463,3 +466,6 @@ def _generate_application_jwt(self): def _create_header_auth_string(self): hash = base64.b64encode(f"{self.api_key}:{self.api_secret}".encode("utf-8")).decode("ascii") return f"Basic {hash}" + + def _create_oauth2_auth_string(self, token: str): + return f'Bearer {token}' diff --git a/src/vonage/errors.py b/src/vonage/errors.py index ae809508..27e4f8dd 100644 --- a/src/vonage/errors.py +++ b/src/vonage/errors.py @@ -70,11 +70,3 @@ class TokenExpiryError(ClientError): class SipError(ClientError): """Error related to usage of SIP calls.""" - - -class SimSwapError(ClientError): - """Error related to SIM swap requests.""" - - -class NumberVerificationError(ClientError): - """Error related to number verification requests.""" diff --git a/src/vonage/number_verification.py b/src/vonage/number_verification.py deleted file mode 100644 index 5f7cb7f7..00000000 --- a/src/vonage/number_verification.py +++ /dev/null @@ -1,36 +0,0 @@ -from __future__ import annotations -from typing import TYPE_CHECKING - -from .errors import NumberVerificationError - -if TYPE_CHECKING: - from vonage import Client - - -class NumberVerification: - """Class containing methods for working with the Vonage SIM Swap API.""" - - def __init__(self, client: Client): - self._client = client - self._auth_type = '' - - def verify_number(self, number: str): - """Verifies if the specified phone number (plain text or hashed format) matches the one that the user is currently using. - - Args: - number: The phone number to verify, either in plain text or as a SDK-256 hash of the phone number in E.164 format. - - Returns: - The response from the API as a dict. - """ - if len(number) == 64: - params = {'hashedPhoneNumber': number} - else: - params = {'phoneNumber': number} - - return self._client.post( - 'api-eu.vonage.com', - '/camara/number-verification/v040/verify', - params=params, - auth_type=self._auth_type, - ) diff --git a/src/vonage/sim_swap.py b/src/vonage/sim_swap.py index 2fb55b4b..23b67623 100644 --- a/src/vonage/sim_swap.py +++ b/src/vonage/sim_swap.py @@ -1,7 +1,7 @@ from __future__ import annotations from typing import TYPE_CHECKING -from .errors import SimSwapError +from .camara_auth import CamaraAuth if TYPE_CHECKING: from vonage import Client @@ -12,37 +12,50 @@ class SimSwap: def __init__(self, client: Client): self._client = client - self._auth_type = '' + self._auth_type = 'oauth2' + self._camara_auth = CamaraAuth(client) - def check(self, phone_number: str, max_age: int): + def check(self, phone_number: str, max_age: int = None): """Check if a SIM swap has been performed in a given time frame. Args: - phone_number: The phone number to check. - max_age: The maximum age of the check in hours. + phone_number (str): The phone number to check. Use the E.164 format without a leading +. + max_age (int, optional): Period in hours to be checked for SIM swap. Returns: The response from the API as a dict. """ + oicd_response = self._camara_auth.make_oidc_request( + number=phone_number, scope='dpv:FraudPreventionAndDetection#check-sim-swap' + ) + token = self._camara_auth.request_camara_token(oicd_response) + return self._client.post( 'api-eu.vonage.com', '/camara/sim-swap/v040/check', params={'phoneNumber': phone_number, 'maxAge': max_age}, auth_type=self._auth_type, + oauth_token=token, ) def get_last_swap_date(self, phone_number: str): """Get the last SIM swap date for a phone number. Args: - phone_number: The phone number to check. + phone_number (str): The phone number to check. Use the E.164 format without a leading +. Returns: The response from the API as a dict. """ - return self._client.get( + oicd_response = self._camara_auth.make_oidc_request( + number=phone_number, + scope='dpv:FraudPreventionAndDetection#retrieve-sim-swap-date', + ) + token = self._camara_auth.request_camara_token(oicd_response) + return self._client.post( 'api-eu.vonage.com', '/camara/sim-swap/v040/retrieve-date', params={'phoneNumber': phone_number}, auth_type=self._auth_type, + oauth_token=token, ) diff --git a/tests/conftest.py b/tests/conftest.py index 0290bc72..4cbd032a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -139,3 +139,17 @@ def proc(client): import vonage return vonage.ProactiveConnect(client) + + +@pytest.fixture +def camara_auth(client): + from vonage.camara_auth import CamaraAuth + + return CamaraAuth(client) + + +@pytest.fixture +def sim_swap(client): + import vonage + + return vonage.SimSwap(client) diff --git a/tests/data/camara_auth/oidc_request.json b/tests/data/camara_auth/oidc_request.json new file mode 100644 index 00000000..af44b50d --- /dev/null +++ b/tests/data/camara_auth/oidc_request.json @@ -0,0 +1,5 @@ +{ + "auth_req_id": "0dadaeb4-7c79-4d39-b4b0-5a6cc08bf537", + "expires_in": "120", + "interval": "2" +} \ No newline at end of file diff --git a/tests/data/camara_auth/token_request.json b/tests/data/camara_auth/token_request.json new file mode 100644 index 00000000..46adda84 --- /dev/null +++ b/tests/data/camara_auth/token_request.json @@ -0,0 +1,5 @@ +{ + "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkZWZhdWx0IiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.3wWzV6t8bFJUZ6k0WJ9kY3J2kNw9v5zXJ8x1J5g1v2k", + "token_type": "A-VALID-TOKEN-TYPE", + "refresh_token": "A-VALID-REFRESH-TOKEN" +} \ No newline at end of file diff --git a/tests/data/sim_swap/check_sim_swap.json b/tests/data/sim_swap/check_sim_swap.json new file mode 100644 index 00000000..8d90e1b6 --- /dev/null +++ b/tests/data/sim_swap/check_sim_swap.json @@ -0,0 +1,3 @@ +{ + "swapped": true +} \ No newline at end of file diff --git a/tests/data/sim_swap/get_swap_date.json b/tests/data/sim_swap/get_swap_date.json new file mode 100644 index 00000000..30efaa41 --- /dev/null +++ b/tests/data/sim_swap/get_swap_date.json @@ -0,0 +1,3 @@ +{ + "latestSimChange": "2019-08-24T14:15:22Z" +} \ No newline at end of file diff --git a/tests/test_camara_auth.py b/tests/test_camara_auth.py new file mode 100644 index 00000000..43dc7d2b --- /dev/null +++ b/tests/test_camara_auth.py @@ -0,0 +1,43 @@ +from vonage.camara_auth import CamaraAuth +from util import * + +import responses + + +@responses.activate +def test_oidc_request(camara_auth: CamaraAuth): + stub( + responses.POST, + 'https://api-eu.vonage.com/oauth2/bc-authorize', + fixture_path='camara_auth/oidc_request.json', + ) + + response = camara_auth.make_oidc_request( + number='447700900000', + scope='dpv:FraudPreventionAndDetection#check-sim-swap', + ) + + assert response['auth_req_id'] == '0dadaeb4-7c79-4d39-b4b0-5a6cc08bf537' + assert response['expires_in'] == '120' + assert response['interval'] == '2' + + +@responses.activate +def test_request_camara_access_token(camara_auth: CamaraAuth): + stub( + responses.POST, + 'https://api-eu.vonage.com/oauth2/token', + fixture_path='camara_auth/token_request.json', + ) + + oidc_response = { + 'auth_req_id': '0dadaeb4-7c79-4d39-b4b0-5a6cc08bf537', + 'expires_in': '120', + 'interval': '2', + } + response = camara_auth.request_camara_token(oidc_response) + + assert ( + response + == 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkZWZhdWx0IiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.3wWzV6t8bFJUZ6k0WJ9kY3J2kNw9v5zXJ8x1J5g1v2k' + ) diff --git a/tests/test_sim_swap.py b/tests/test_sim_swap.py new file mode 100644 index 00000000..a0c6fc93 --- /dev/null +++ b/tests/test_sim_swap.py @@ -0,0 +1,50 @@ +from vonage.sim_swap import SimSwap +from util import * + +import responses + + +@responses.activate +def test_check_sim_swap(sim_swap: SimSwap): + stub( + responses.POST, + 'https://api-eu.vonage.com/oauth2/bc-authorize', + fixture_path='camara_auth/oidc_request.json', + ) + stub( + responses.POST, + 'https://api-eu.vonage.com/oauth2/token', + fixture_path='camara_auth/token_request.json', + ) + stub( + responses.POST, + 'https://api-eu.vonage.com/camara/sim-swap/v040/check', + fixture_path='sim_swap/check_sim_swap.json', + ) + + response = sim_swap.check('447700900000', max_age=24) + + assert response['swapped'] == True + + +@responses.activate +def test_get_last_swap_date(sim_swap: SimSwap): + stub( + responses.POST, + 'https://api-eu.vonage.com/oauth2/bc-authorize', + fixture_path='camara_auth/oidc_request.json', + ) + stub( + responses.POST, + 'https://api-eu.vonage.com/oauth2/token', + fixture_path='camara_auth/token_request.json', + ) + stub( + responses.POST, + 'https://api-eu.vonage.com/camara/sim-swap/v040/retrieve-date', + fixture_path='sim_swap/get_swap_date.json', + ) + + response = sim_swap.get_last_swap_date('447700900000') + + assert response['latestSimChange'] == '2019-08-24T14:15:22Z'