diff --git a/airbyte-integrations/connectors/source-okta/setup.py b/airbyte-integrations/connectors/source-okta/setup.py index a6b22cfd8f86..b0f80cf962aa 100644 --- a/airbyte-integrations/connectors/source-okta/setup.py +++ b/airbyte-integrations/connectors/source-okta/setup.py @@ -13,6 +13,8 @@ TEST_REQUIREMENTS = [ "pytest~=6.1", "source-acceptance-test", + "pytest-mock~=3.6.1", + "requests-mock", ] setup( diff --git a/airbyte-integrations/connectors/source-okta/source_okta/source.py b/airbyte-integrations/connectors/source-okta/source_okta/source.py index bf3602dad259..5f912e887972 100644 --- a/airbyte-integrations/connectors/source-okta/source_okta/source.py +++ b/airbyte-integrations/connectors/source-okta/source_okta/source.py @@ -261,11 +261,11 @@ def initialize_authenticator(self, config: Mapping[str, Any]): creds = config.get("credentials") if not creds: - raise "Config validation error. `credentials` not specified." + raise Exception("Config validation error. `credentials` not specified.") auth_type = creds.get("auth_type") if not auth_type: - raise "Config validation error. `auth_type` not specified." + raise Exception("Config validation error. `auth_type` not specified.") if auth_type == "api_token": return TokenAuthenticator(creds["api_token"], auth_method="SSWS") diff --git a/airbyte-integrations/connectors/source-okta/unit_tests/conftest.py b/airbyte-integrations/connectors/source-okta/unit_tests/conftest.py new file mode 100644 index 000000000000..7484ed4c4f4a --- /dev/null +++ b/airbyte-integrations/connectors/source-okta/unit_tests/conftest.py @@ -0,0 +1,393 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest + + +@pytest.fixture() +def url_base(): + """ + URL base for test + """ + return "https://test_domain.okta.com" + + +@pytest.fixture() +def api_url(url_base): + """ + Just return API url based on url_base + """ + return f"{url_base}" + + +@pytest.fixture() +def oauth_config(): + """ + Credentials for oauth2.0 authorization + """ + return { + "credentials": { + "auth_type": "oauth2.0", + "client_secret": "test_client_secret", + "client_id": "test_client_id", + "refresh_token": "test_refresh_token", + }, + "domain": "test_domain", + } + + +@pytest.fixture() +def wrong_oauth_config_bad_credentials_record(): + """ + Malformed Credentials for oauth2.0 authorization + credentials -> credential + """ + return { + "credential": { + "auth_type": "oauth2.0", + "client_secret": "test_client_secret", + "client_id": "test_client_id", + "refresh_token": "test_refresh_token", + }, + "domain": "test_domain", + } + + +@pytest.fixture() +def wrong_oauth_config_bad_auth_type(): + """ + Wrong Credentials format for oauth2.0 authorization + absent "auth_type" field + """ + return { + "credentials": { + "client_secret": "test_client_secret", + "client_id": "test_client_id", + "refresh_token": "test_refresh_token", + }, + "domain": "test_domain", + } + + +@pytest.fixture() +def token_config(): + """ + Just test 'tolen' + """ + return {"token": "test_token"} + + +@pytest.fixture() +def auth_token_config(): + """ + Credentials for Token Authorization connect + """ + return {"credentials": {"auth_type": "api_token", "api_token": "test_token"}} + + +@pytest.fixture() +def users_instance(api_url): + """ + Users instance object response + """ + return { + "id": "test_user_id", + "status": "ACTIVE", + "created": "2021-04-21T21:04:03.000Z", + "activated": None, + "statusChanged": "2021-04-21T21:41:18.000Z", + "lastLogin": "2022-07-18T07:57:05.000Z", + "lastUpdated": "2021-11-03T13:45:55.000Z", + "passwordChanged": "2021-04-21T21:41:18.000Z", + "type": {"id": "test_user_type"}, + "profile": { + "firstName": "TestUser", + "lastName": "Test", + "mobilePhone": "+1 2342 2342424", + "secondEmail": None, + "login": "test@airbyte.io", + "email": "test@airbyte.io", + }, + "credentials": { + "password": {}, + "emails": [{"value": "test@airbyte.io", "status": "VERIFIED", "type": "PRIMARY"}], + "provider": {"type": "OKTA", "name": "OKTA"}, + }, + "_links": {"self": {"href": f"{api_url}/users/test_user_id"}}, + } + + +@pytest.fixture() +def custom_role_instance(api_url): + """ + Custom Role instance object response + """ + _id = "custom_role_id" + return { + "id": _id, + "label": "custom role for test", + "description": "custom role for test", + "created": "2022-07-13T07:54:31.000Z", + "lastUpdated": "2022-07-13T07:54:31.000Z", + "_links": { + "permissions": {"href": f"{api_url}/iam/roles/{_id}/permissions"}, + "self": {"href": f"{api_url}/iam/roles/{_id}"}, + }, + } + + +@pytest.fixture() +def groups_instance(api_url): + """ + Groups instance object response + """ + _id = "test_group_id" + return { + "id": _id, + "created": "2021-04-21T21:03:55.000Z", + "lastUpdated": "2021-04-21T21:03:55.000Z", + "lastMembershipUpdated": "2021-09-08T07:04:28.000Z", + "objectClass": ["okta:user_group"], + "type": "BUILT_IN", + "profile": {"name": "Everyone", "description": "All users in your organization"}, + "_links": { + "logo": [ + {"name": "medium", "href": "{https://test_static_image.png", "type": "image/png"}, + {"name": "large", "href": "https://test_other_static_image.png", "type": "image/png"}, + ], + "users": {"href": f"{api_url}/groups/{_id}/users"}, + "apps": {"href": f"{api_url}/groups/{_id}/apps"}, + }, + } + + +@pytest.fixture() +def group_members_instance(api_url): + """ + Group Members instance object response + """ + _id = "test_user_id" + return { + "id": _id, + "status": "ACTIVE", + "created": "2021-04-21T21:04:03.000Z", + "activated": None, + "statusChanged": "2021-04-21T21:41:18.000Z", + "lastLogin": "2022-07-18T07:57:05.000Z", + "lastUpdated": "2021-11-03T13:45:55.000Z", + "passwordChanged": "2021-04-21T21:41:18.000Z", + "type": {"id": "test_user_type"}, + "profile": { + "firstName": "test_user_first_name", + "lastName": "test_user_last_name", + "mobilePhone": "+1 234 56789012", + "secondEmail": None, + "login": "test@login.test", + "email": "test@login.test", + }, + "credentials": { + "password": {}, + "emails": [{"value": "test@login.test", "status": "VERIFIED", "type": "PRIMARY"}], + "provider": {"type": "OKTA", "name": "OKTA"}, + }, + "_links": {"self": {"href": f"{api_url}/users/{_id}"}}, + } + + +@pytest.fixture() +def group_role_assignments_instance(): + """ + Group Role Assignment instance object response + """ + return { + "actor": { + "id": "test_user_id", + "type": "User", + "alternateId": "test@airbyte.io", + "displayName": "test_user_first_name test_user_last_name", + "detailEntry": None, + }, + "client": {"userAgent": None, "zone": None, "device": None, "id": None, "ipAddress": None, "geographicalContext": None}, + "device": None, + "authenticationContext": { + "authenticationProvider": None, + "credentialProvider": None, + "credentialType": None, + "issuer": None, + "interface": None, + "authenticationStep": 0, + "externalSessionId": None, + }, + "displayMessage": "Add assigned application to group", + "eventType": "group.application_assignment.add", + "outcome": {"result": "SUCCESS", "reason": None}, + "published": "2022-07-18T07:58:55.625Z", + "securityContext": {"asNumber": None, "asOrg": None, "isp": None, "domain": None, "isProxy": None}, + "severity": "INFO", + "debugContext": {"debugData": {"groupAppAssignmentId": "test_group_app_assignment_id"}}, + "legacyEventType": "group.application_assignment.add", + "transaction": {"type": "JOB", "id": "test_transaction_id", "detail": {}}, + "uuid": "test_uuid", + "version": "0", + "request": {"ipChain": []}, + "target": [ + {"id": "test_user_group_id", "type": "UserGroup", "alternateId": "unknown", "displayName": "test-runner", "detailEntry": None}, + { + "id": "test_app_instance_id", + "type": "AppInstance", + "alternateId": "Okta Admin Console", + "displayName": "Okta Admin Console", + "detailEntry": None, + }, + ], + } + + +@pytest.fixture() +def user_role_assignments_instance(api_url): + """ + User Role Assignment instance object response + """ + _user_id = "test_user_id" + return { + "id": _user_id, + "label": "Super Organization Administrator", + "type": "SUPER_ADMIN", + "status": "ACTIVE", + "created": "2021-04-21T21:04:03.000Z", + "lastUpdated": "2021-04-21T21:04:03.000Z", + "assignmentType": "USER", + "_links": {"assignee": {"href": f"{api_url}/users/{_user_id}"}}, + } + + +@pytest.fixture() +def logs_instance(): + """ + Logs instance object response + """ + return { + "actor": { + "id": "test_client_app_id", + "type": "PublicClientApp", + "alternateId": "test_client_app_id", + "displayName": "Airbyte", + "detailEntry": None, + }, + "client": { + "userAgent": {"rawUserAgent": "python-requests/2.28.1", "os": "Unknown", "browser": "UNKNOWN"}, + "zone": "None", + "device": "Unknown", + "id": None, + "ipAddress": "0.0.0.0", + "geographicalContext": { + "city": "TestCity", + "state": "Test State", + "country": "Test Country", + "postalCode": "31-008", + "geolocation": {"lat": 0.0, "lon": 0.0}, + }, + }, + "device": None, + "authenticationContext": { + "authenticationProvider": None, + "credentialProvider": None, + "credentialType": None, + "issuer": None, + "interface": None, + "authenticationStep": 0, + "externalSessionId": "unknown", + }, + "displayMessage": "OIDC access token is granted", + "eventType": "app.oauth2.token.grant.access_token", + "outcome": {"result": "SUCCESS", "reason": None}, + "published": "2022-07-19T15:54:11.545Z", + "securityContext": {"asNumber": 0, "asOrg": "Test Org", "isp": "TestProvider", "domain": "test-domain.com", "isProxy": False}, + "severity": "INFO", + "debugContext": { + "debugData": { + "clientAuthType": "client_secret_basic", + "grantedScopes": "okta.users.read, okta.logs.read, okta.groups.read, okta.roles.read, offline_access", + "requestId": "test_debug_request_id", + "responseTime": "559", + "dtHash": "test_dt_hash", + "clientSecret": "test_client_secret", + "requestUri": "/oauth2/v1/token", + "requestedScopes": "", + "threatSuspected": "False", + "grantType": "refresh_token", + "url": "/oauth2/v1/token?", + } + }, + "legacyEventType": "app.oauth2.token.grant.access_token_success", + "transaction": {"type": "WEB", "id": "test_debug_request_id", "detail": {}}, + "uuid": "test_uuid", + "version": "0", + "request": { + "ipChain": [ + { + "ip": "0.0.0.0", + "geographicalContext": { + "city": "TestCity", + "state": "Test State", + "country": "Test Country", + "postalCode": "31-008", + "geolocation": {"lat": 0.0, "lon": 0.0}, + }, + "version": "V4", + "source": None, + } + ] + }, + "target": [ + {"id": "test_user_id", "type": "User", "alternateId": None, "displayName": None, "detailEntry": None}, + { + "id": "test_id", + "type": "access_token", + "alternateId": None, + "displayName": "Access Token", + "detailEntry": {"expires": "2022-07-19T16:54:11.000Z", "subject": "test_user_id", "hash": "test_detail_entry_hash"}, + }, + ], + } + + +@pytest.fixture() +def latest_record_instance(url_base, api_url): + """ + Last Record instance object response + """ + return { + "id": "test_user_group_id", + "created": "2022-07-18T07:58:11.000Z", + "lastUpdated": "2022-07-18T07:58:11.000Z", + "lastMembershipUpdated": "2022-07-18T07:58:11.000Z", + "objectClass": ["okta:user_group"], + "type": "OKTA_GROUP", + "profile": {"name": "test-runner", "description": None}, + "_links": { + "logo": [ + {"name": "medium", "href": f"{url_base}/path_to_images/okta-medium.filename.png", "type": "image/png"}, + {"name": "large", "href": f"{url_base}/path_to_images/okta-large.filename.png", "type": "image/png"}, + ], + "users": {"href": f"{api_url}/groups/test_user_group_id/users"}, + "apps": {"href": f"{api_url}/groups/test_user_group_id/apps"}, + }, + } + + +@pytest.fixture() +def error_while_refreshing_access_token(): + """ + Error raised when using incorrect access token + """ + return "Error while refreshing access token: 'access_token'" + + +@pytest.fixture() +def error_failed_to_authorize_with_provided_credentials(): + """ + Error raised when using incorrect oauth2.0 credentials + """ + return "Failed to authenticate with the provided credentials" diff --git a/airbyte-integrations/connectors/source-okta/unit_tests/test_source.py b/airbyte-integrations/connectors/source-okta/unit_tests/test_source.py new file mode 100644 index 000000000000..ea00e98e96a0 --- /dev/null +++ b/airbyte-integrations/connectors/source-okta/unit_tests/test_source.py @@ -0,0 +1,111 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator +from source_okta.source import ( + CustomRoles, + GroupMembers, + GroupRoleAssignments, + Groups, + Logs, + OktaOauth2Authenticator, + SourceOkta, + UserRoleAssignments, + Users, +) + + +class TestAuthentication: + + def test_init_token_authentication_init(self, token_config, auth_token_config): + source_okta = SourceOkta() + token_authenticator_instance = source_okta.initialize_authenticator(config=token_config) + assert isinstance(token_authenticator_instance, TokenAuthenticator) + + token_authenticator_instance = source_okta.initialize_authenticator(config=auth_token_config) + assert isinstance(token_authenticator_instance, TokenAuthenticator) + + def test_init_oauth2_authentication_init(self, oauth_config): + source_okta = SourceOkta() + oauth_authentication_instance = source_okta.initialize_authenticator(config=oauth_config) + assert isinstance(oauth_authentication_instance, OktaOauth2Authenticator) + + def test_init_oauth2_authentication_wrong_credentials_record(self, wrong_oauth_config_bad_credentials_record): + source_okta = SourceOkta() + try: + source_okta.initialize_authenticator(config=wrong_oauth_config_bad_credentials_record) + except Exception as e: + assert e.args[0] == "Config validation error. `credentials` not specified." + + def test_init_oauth2_authentication_wrong_oauth_config_bad_auth_type(self, wrong_oauth_config_bad_auth_type): + source_okta = SourceOkta() + try: + source_okta.initialize_authenticator(config=wrong_oauth_config_bad_auth_type) + except Exception as e: + assert e.args[0] == "Config validation error. `auth_type` not specified." + + def test_check_connection_ok(self, requests_mock, oauth_config, api_url): + source_okta = SourceOkta() + oauth_authentication_instance = source_okta.initialize_authenticator(config=oauth_config) + assert isinstance(oauth_authentication_instance, OktaOauth2Authenticator) + + requests_mock.get(f"{api_url}/api/v1/users?limit=1", json={"connect": "ok"}) + requests_mock.post(f"{api_url}/oauth2/v1/token", json={"access_token": "test_token", "expires_in": 948}) + assert source_okta.check_connection(logger=MagicMock(), config=oauth_config) == (True, None) + + def test_check_connection_error_status_code(self, requests_mock, oauth_config, api_url): + source_okta = SourceOkta() + oauth_authentication_instance = source_okta.initialize_authenticator(config=oauth_config) + assert isinstance(oauth_authentication_instance, OktaOauth2Authenticator) + + requests_mock.get(f"{api_url}/api/v1/users?limit=1", status_code=400, json={}) + requests_mock.post(f"{api_url}/oauth2/v1/token", json={"access_token": "test_token", "expires_in": 948}) + + assert source_okta.check_connection(logger=MagicMock(), config=oauth_config) == (False, {}) + + def test_check_connection_error_with_exception( + self, requests_mock, oauth_config, api_url, error_failed_to_authorize_with_provided_credentials + ): + source_okta = SourceOkta() + oauth_authentication_instance = source_okta.initialize_authenticator(config=oauth_config) + assert isinstance(oauth_authentication_instance, OktaOauth2Authenticator) + + requests_mock.get(f"{api_url}/api/v1/users?limit=1", status_code=400, json="ss") + requests_mock.post(f"{api_url}/oauth2/v1/token", json={"access_token": "test_token", "expires_in": 948}) + + assert source_okta.check_connection(logger=MagicMock(), config="wrong_config") == ( + False, + error_failed_to_authorize_with_provided_credentials, + ) + + def test_check_streams(self, requests_mock, oauth_config, api_url): + source_okta = SourceOkta() + oauth_authentication_instance = source_okta.initialize_authenticator(config=oauth_config) + assert isinstance(oauth_authentication_instance, OktaOauth2Authenticator) + + requests_mock.get(f"{api_url}/api/v1/users?limit=1", json={"connect": "ok"}) + requests_mock.post(f"{api_url}/oauth2/v1/token", json={"access_token": "test_token", "expires_in": 948}) + streams = source_okta.streams(config=oauth_config) + for i, _ in enumerate([Groups, Logs, Users, GroupMembers, CustomRoles, UserRoleAssignments, GroupRoleAssignments]): + assert isinstance(streams[i], _) + + def test_oauth2_refresh_token_ok(self, requests_mock, oauth_config, api_url): + source_okta = SourceOkta() + oauth_authentication_instance = source_okta.initialize_authenticator(config=oauth_config) + assert isinstance(oauth_authentication_instance, OktaOauth2Authenticator) + requests_mock.post(f"{api_url}/oauth2/v1/token", json={"access_token": "test_token", "expires_in": 948}) + result = oauth_authentication_instance.refresh_access_token() + assert result == ("test_token", 948) + + def test_oauth2_refresh_token_failed(self, requests_mock, oauth_config, api_url, error_while_refreshing_access_token): + source_okta = SourceOkta() + oauth_authentication_instance = source_okta.initialize_authenticator(config=oauth_config) + assert isinstance(oauth_authentication_instance, OktaOauth2Authenticator) + requests_mock.post(f"{api_url}/oauth2/v1/token", json={"token": "test_token", "expires_in": 948}) + try: + oauth_authentication_instance.refresh_access_token() + except Exception as e: + assert e.args[0] == error_while_refreshing_access_token diff --git a/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py new file mode 100644 index 000000000000..52a2c41fd1ff --- /dev/null +++ b/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py @@ -0,0 +1,339 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import datetime +import time +from abc import ABC +from http import HTTPStatus +from unittest.mock import MagicMock + +import pytest +import requests +from airbyte_cdk.models import SyncMode +from source_okta.source import ( + CustomRoles, + GroupMembers, + GroupRoleAssignments, + Groups, + IncrementalOktaStream, + Logs, + OktaStream, + UserRoleAssignments, + Users, +) + + +@pytest.fixture +def patch_base_class(mocker): + """ + Base patcher for used streams + """ + mocker.patch.object(OktaStream, "path", "v0/example_endpoint") + mocker.patch.object(OktaStream, "primary_key", "test_primary_key") + mocker.patch.object(OktaStream, "__abstractmethods__", set()) + mocker.patch.object(IncrementalOktaStream, "path", "v0/example_endpoint") + mocker.patch.object(IncrementalOktaStream, "primary_key", "test_primary_key") + mocker.patch.object(IncrementalOktaStream, "__abstractmethods__", set()) + + +class TestStatusCodes: + @pytest.mark.parametrize( + ("http_status", "should_retry"), + [ + (HTTPStatus.OK, False), + (HTTPStatus.BAD_REQUEST, False), + (HTTPStatus.TOO_MANY_REQUESTS, True), + (HTTPStatus.INTERNAL_SERVER_ERROR, True), + ], + ) + def test_should_retry(self, patch_base_class, http_status, should_retry, url_base): + response_mock = MagicMock() + response_mock.status_code = http_status + stream = OktaStream(url_base=url_base) + assert stream.should_retry(response_mock) == should_retry + + +class TestOktaStream: + + def test_okta_stream_request_params(self, patch_base_class, url_base): + stream = OktaStream(url_base=url_base) + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + expected_params = {"limit": 200} + assert stream.request_params(**inputs) == expected_params + + def test_okta_stream_parse_response(self, patch_base_class, requests_mock, url_base, api_url): + stream = OktaStream(url_base=url_base) + requests_mock.get(f"{api_url}", json=[{"a": 123}, {"b": "xx"}]) + resp = requests.get(f"{api_url}") + inputs = {"response": resp, "stream_state": MagicMock()} + expected_parsed_object = [{"a": 123}, {"b": "xx"}] + assert list(stream.parse_response(**inputs)) == expected_parsed_object + + def test_okta_stream_backoff_time(self, patch_base_class, url_base): + response_mock = requests.Response() + stream = OktaStream(url_base=url_base) + expected_backoff_time = None + assert stream.backoff_time(response_mock) == expected_backoff_time + + def test_okta_stream_incremental_request_params(self, patch_base_class, url_base): + stream = IncrementalOktaStream(url_base=url_base) + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + expected_params = {"limit": 200} + assert stream.request_params(**inputs) == expected_params + + def test_incremental_okta_stream_parse_response(self, patch_base_class, requests_mock, url_base, api_url): + stream = IncrementalOktaStream(url_base=url_base) + requests_mock.get(f"{api_url}", json=[{"a": 123}, {"b": "xx"}]) + resp = requests.get(f"{api_url}") + inputs = {"response": resp, "stream_state": MagicMock()} + expected_parsed_object = [{"a": 123}, {"b": "xx"}] + assert list(stream.parse_response(**inputs)) == expected_parsed_object + + def test_incremental_okta_stream_backoff_time(self, patch_base_class, url_base): + response_mock = MagicMock() + stream = IncrementalOktaStream(url_base=url_base) + expected_backoff_time = None + assert stream.backoff_time(response_mock) == expected_backoff_time + + def test_okta_stream_incremental_backoff_time_empty(self, patch_base_class, url_base): + stream = IncrementalOktaStream(url_base=url_base) + response = MagicMock(requests.Response) + response.status_code = 200 + expected_params = None + inputs = {"response": response} + assert stream.backoff_time(**inputs) == expected_params + + def test_okta_stream_incremental_back_off_now(self, patch_base_class, url_base): + stream = IncrementalOktaStream(url_base=url_base) + response = MagicMock(requests.Response) + response.status_code = requests.codes.TOO_MANY_REQUESTS + response.headers = {"x-rate-limit-reset": int(time.time())} + expected_params = (0, 2) + inputs = {"response": response} + get_backoff_time = stream.backoff_time(**inputs) + assert expected_params[0] <= get_backoff_time <= expected_params[1] + + def test_okta_stream_incremental_get_updated_state(self, patch_base_class, latest_record_instance, url_base): + class TestIncrementalOktaStream(IncrementalOktaStream, ABC): + def __init__(self, url_base: str, *args, **kwargs): + super().__init__(url_base, *args, **kwargs) + self._cursor_field = None + + @property + def cursor_field(self) -> str: + return self._cursor_field + + stream = TestIncrementalOktaStream(url_base=url_base) + stream._cursor_field = "lastUpdated" + + current_stream_state = {"lastUpdated": "2021-04-21T21:03:55.000Z"} + update_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record_instance) + expected_result = {"lastUpdated": "2022-07-18T07:58:11.000Z"} + assert update_state == expected_result + + def test_okta_stream_http_method(self, patch_base_class, url_base): + stream = OktaStream(url_base=url_base) + expected_method = "GET" + assert stream.http_method == expected_method + + +class TestNextPageToken: + + def test_next_page_token(self, patch_base_class, users_instance, url_base, api_url): + stream = OktaStream(url_base=url_base) + response = MagicMock(requests.Response) + response.links = {"next": {"url": f"{api_url}?param1=test_value1¶m2=test_value2"}} + inputs = {"response": response} + expected_token = {"param1": "test_value1", "param2": "test_value2"} + result = stream.next_page_token(**inputs) + assert result == expected_token + + def test_next_page_token_empty_params(self, patch_base_class, users_instance, url_base, api_url): + stream = OktaStream(url_base=url_base) + response = MagicMock(requests.Response) + response.links = {"next": {"url": f"{api_url}"}} + inputs = {"response": response} + expected_token = {} + result = stream.next_page_token(**inputs) + assert result == expected_token + + def test_next_page_token_link_have_self_and_equal_next(self, patch_base_class, users_instance, url_base, api_url): + stream = OktaStream(url_base=url_base) + response = MagicMock(requests.Response) + response.links = {"next": {"url": f"{api_url}"}, "self": {"url": f"{api_url}"}} + inputs = {"response": response} + expected_token = None + result = stream.next_page_token(**inputs) + assert result == expected_token + + +class TestStreamUsers: + + def test_stream_users(self, requests_mock, patch_base_class, users_instance, url_base, api_url): + stream = Users(url_base=url_base) + requests_mock.get(f"{api_url}/users", json=[users_instance]) + inputs = {"sync_mode": SyncMode.incremental} + assert list(stream.read_records(**inputs)) == [users_instance] + + def test_users_request_params_out_of_next_page_token(self, patch_base_class, url_base): + stream = Users(url_base=url_base) + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + expected_params = {"limit": 200} + assert stream.request_params(**inputs) == expected_params + + def test_users_source_request_params_have_next_cursor(self, patch_base_class, url_base): + stream = Users(url_base=url_base) + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": {"next_cursor": "123"}} + expected_params = {"limit": 200, "next_cursor": "123"} + assert stream.request_params(**inputs) == expected_params + + def test_users_source_request_params_have_latest_entry(self, patch_base_class, url_base): + stream = Users(url_base=url_base) + inputs = {"stream_slice": None, "stream_state": {"lastUpdated": "some_date"}, "next_page_token": {"next_cursor": "123"}} + expected_params = {"limit": 200, "next_cursor": "123", "filter": 'lastUpdated gt "some_date"'} + assert stream.request_params(**inputs) == expected_params + + def test_users_source_parse_response(self, requests_mock, patch_base_class, users_instance, url_base, api_url): + stream = Users(url_base=url_base) + requests_mock.get(f"{api_url}", json=[users_instance]) + assert list(stream.parse_response(response=requests.get(f"{api_url}"))) == [users_instance] + + +class TestStreamCustomRoles: + + def test_custom_roles(self, requests_mock, patch_base_class, custom_role_instance, url_base, api_url): + stream = CustomRoles(url_base=url_base) + record = {"roles": [custom_role_instance]} + requests_mock.get(f"{api_url}/iam/roles?limit=200", json=record) + inputs = {"sync_mode": SyncMode.incremental} + assert list(stream.read_records(**inputs)) == record["roles"] + + def test_custom_roles_parse_response(self, requests_mock, patch_base_class, custom_role_instance, url_base, api_url): + stream = CustomRoles(url_base=url_base) + record = {"roles": [custom_role_instance]} + requests_mock.get(f"{api_url}", json=record) + assert list(stream.parse_response(response=requests.get(f"{api_url}"))) == [custom_role_instance] + + +class TestStreamGroups: + + def test_groups(self, requests_mock, patch_base_class, groups_instance, url_base, api_url): + stream = Groups(url_base=url_base) + requests_mock.get(f"{api_url}/groups?limit=200", json=[groups_instance]) + inputs = {"sync_mode": SyncMode.incremental} + assert list(stream.read_records(**inputs)) == [groups_instance] + + def test_groups_parse_response(self, requests_mock, patch_base_class, groups_instance, url_base, api_url): + stream = Groups(url_base=url_base) + requests_mock.get(f"{api_url}", json=[groups_instance]) + assert list(stream.parse_response(response=requests.get(f"{api_url}"))) == [groups_instance] + + +class TestStreamGroupMembers: + + def test_group_members(self, requests_mock, patch_base_class, group_members_instance, url_base, api_url): + stream = GroupMembers(url_base=url_base) + group_id = "test_group_id" + requests_mock.get(f"{api_url}/groups/{group_id}/users?limit=200", json=[group_members_instance]) + inputs = {"sync_mode": SyncMode.incremental, "stream_state": {}, "stream_slice": {"group_id": group_id}} + assert list(stream.read_records(**inputs)) == [group_members_instance] + + def test_group_members_parse_response(self, requests_mock, patch_base_class, group_members_instance, url_base, api_url): + stream = GroupMembers(url_base=url_base) + requests_mock.get(f"{api_url}", json=[group_members_instance]) + assert list(stream.parse_response(response=requests.get(f"{api_url}"))) == [group_members_instance] + + def test_group_members_request_params_with_latest_entry(self, patch_base_class, group_members_instance, url_base): + stream = GroupMembers(url_base=url_base) + inputs = { + "stream_slice": {"group_id": "some_group"}, + "stream_state": {"id": "some_test_id"}, + "next_page_token": {"next_cursor": "123"}, + } + assert stream.request_params(**inputs) == {"limit": 200, "next_cursor": "123", "after": "some_test_id"} + + def test_group_members_slice_stream(self, requests_mock, patch_base_class, group_members_instance, groups_instance, url_base, api_url): + stream = GroupMembers(url_base=url_base) + requests_mock.get(f"{api_url}/groups?limit=200", json=[groups_instance]) + assert list(stream.stream_slices()) == [{"group_id": "test_group_id"}] + + def test_group_member_request_get_update_state(self, latest_record_instance, url_base): + stream = GroupMembers(url_base=url_base) + stream._cursor_field = "id" + current_stream_state = {"id": "test_user_group_id"} + update_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record_instance) + assert update_state == {"id": "test_user_group_id"} + + +class TestStreamGroupRoleAssignment: + + def test_group_role_assignments(self, requests_mock, patch_base_class, group_role_assignments_instance, url_base, api_url): + stream = GroupRoleAssignments(url_base=url_base) + group_id = "test_group_id" + mock_address = f"{api_url}/groups/{group_id}/roles?limit=200" + requests_mock.get(mock_address, json=[group_role_assignments_instance]) + inputs = {"sync_mode": SyncMode.full_refresh, "stream_state": {}, "stream_slice": {"group_id": group_id}} + assert list(stream.read_records(**inputs)) == [group_role_assignments_instance] + + def test_group_role_assignments_parse_response( + self, requests_mock, patch_base_class, group_role_assignments_instance, url_base, api_url + ): + stream = GroupRoleAssignments(url_base=url_base) + requests_mock.get(f"{api_url}", json=[group_role_assignments_instance]) + assert list(stream.parse_response(response=requests.get(f"{api_url}"))) == [group_role_assignments_instance] + + def test_group_role_assignments_slice_stream( + self, requests_mock, patch_base_class, group_members_instance, groups_instance, url_base, api_url + ): + stream = GroupRoleAssignments(url_base=url_base) + requests_mock.get(f"{api_url}/groups?limit=200", json=[groups_instance]) + assert list(stream.stream_slices()) == [{"group_id": "test_group_id"}] + + +class TestStreamLogs: + + def test_logs(self, requests_mock, patch_base_class, logs_instance, url_base, api_url): + stream = Logs(url_base=url_base) + requests_mock.get(f"{api_url}/logs?limit=200", json=[logs_instance]) + inputs = {"sync_mode": SyncMode.incremental} + assert list(stream.read_records(**inputs)) == [logs_instance] + + def test_logs_parse_response(self, requests_mock, patch_base_class, logs_instance, url_base, api_url): + stream = Logs(url_base=url_base) + requests_mock.get(f"{api_url}/logs?limit=200", json=[logs_instance]) + assert list(stream.parse_response(response=requests.get(f"{api_url}/logs?limit=200"))) == [logs_instance] + + def test_logs_request_params_for_since(self, patch_base_class, logs_instance, url_base): + stream = Logs(url_base=url_base) + inputs = {"stream_state": {"published": "2022-07-19T15:54:11.545Z"}, "stream_slice": None} + assert stream.request_params(**inputs) == {"limit": 200, "since": "2022-07-19T15:54:11.545Z"} + + def test_logs_request_params_for_until(self, patch_base_class, logs_instance, url_base): + stream = Logs(url_base=url_base) + testing_date = datetime.datetime.utcnow() + datetime.timedelta(days=10) + inputs = {"stream_state": {"published": testing_date.isoformat()}, "stream_slice": None} + assert stream.request_params(**inputs) == {"limit": 200, "since": testing_date.isoformat(), "until": testing_date.isoformat()} + + +class TestStreamUserRoleAssignment: + + def test_user_role_assignments(self, requests_mock, patch_base_class, user_role_assignments_instance, url_base, api_url): + stream = UserRoleAssignments(url_base=url_base) + user_id = "test_user_id" + mock_address = f"{api_url}/users/{user_id}/roles?limit=200" + requests_mock.get(mock_address, json=[user_role_assignments_instance]) + inputs = {"sync_mode": SyncMode.full_refresh, "stream_state": {}, "stream_slice": {"user_id": user_id}} + assert list(stream.read_records(**inputs)) == [user_role_assignments_instance] + + def test_user_role_assignments_parse_response(self, requests_mock, patch_base_class, user_role_assignments_instance, url_base, api_url): + stream = UserRoleAssignments(url_base=url_base) + requests_mock.get(f"{api_url}", json=[user_role_assignments_instance]) + assert list(stream.parse_response(response=requests.get(f"{api_url}"))) == [user_role_assignments_instance] + + def test_user_role_assignments_slice_stream( + self, requests_mock, patch_base_class, group_members_instance, users_instance, url_base, api_url + ): + stream = UserRoleAssignments(url_base=url_base) + requests_mock.get(f"{api_url}/users?limit=200", json=[users_instance]) + assert list(stream.stream_slices()) == [{"user_id": "test_user_id"}] diff --git a/airbyte-integrations/connectors/source-okta/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-okta/unit_tests/unit_test.py deleted file mode 100644 index dd619e36b3b4..000000000000 --- a/airbyte-integrations/connectors/source-okta/unit_tests/unit_test.py +++ /dev/null @@ -1,47 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - -import pytest -import requests -from source_okta.source import OktaStream - - -class MockStream(OktaStream): - primary_key = "None" - - def __init__(self): - super().__init__("") - - def path(self, **kwargs) -> str: - return "path" - - -class TestPagination: - @pytest.mark.parametrize( - ("_self_header", "_next_header", "expected_npt", "assert_msg"), - [ - (None, "XYZ", {"after": "XYZ"}, "Expected to receive a new page token if next header is set and self is not set"), - ("ABC", "XYZ", {"after": "XYZ"}, "Expected to receive a new page token if next and self headers have different values"), - ("ABC", "ABC", None, "Expected no new page token if next and self headers are the same"), - ("ABC", None, None, "Expected no new page token if next header is not set"), - ], - ) - def test_pagination(self, _self_header, _next_header, expected_npt, assert_msg): - stream = MockStream() - - fake_response = requests.Response() - link_header = "" - - if _self_header: - link_header += f'; rel="self",' - - if _next_header: - link_header += f'; rel="next"' - - fake_response.headers = {"link": link_header} - - actual_npt = stream.next_page_token(fake_response) - - assert actual_npt == expected_npt, assert_msg