From 8b422c7b198a3c90c404e839671a6b07a6120fa6 Mon Sep 17 00:00:00 2001 From: Yiyang Li Date: Tue, 12 Jul 2022 10:49:57 -0700 Subject: [PATCH] :tada: Source Okta: add GroupMembers stream (#14380) * add Group_Members stream to okta source - Group_Members return a list of users, the same schema of Users stream. - Create a shared schema users, and both group_members and users sechema use it as a reference. - Add Group_Members stream to source connector * add tests and fix logs schema - fix the test error: None is not one of enums though the enum type includes both string and null, it comes from json schema validator https://github.com/python-jsonschema/jsonschema/blob/ddb87afad8f5d5c40600b5ede0ab96e4d4bdf7d3/jsonschema/_validators.py#L279-L285 - change grouop_members to use id as the cursor field since `filter` is not supported in the query string - fix the abnormal state test on logs stream, when since is abnormally large, until has to defined, an equal or a larger value - remove logs stream from full sync test, because 2 full sync always has a gap -- at least a new log about users or groups api. * last polish before submit the PR - bump docker version - update changelog - add the right abnormal value for logs stream - correct the sample catalog * address comments:: - improve comments for until parameter under the logs stream - add use_cache on groupMembers * add use_cache to Group_Members * change configured_catalog to test * auto-bump connector version Co-authored-by: marcosmarxm Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-okta/Dockerfile | 2 +- .../source-okta/acceptance-test-config.yml | 2 - .../integration_tests/abnormal_state.json | 4 +- .../integration_tests/catalog.json | 10 + .../integration_tests/configured_catalog.json | 13 +- .../source-okta/sample_files/catalog.json | 11 + .../source-okta/sample_files/config.json | 4 + .../source_okta/schemas/group_members.json | 3 + .../source-okta/source_okta/schemas/logs.json | 15 +- .../source_okta/schemas/shared/users.json | 230 ++++++++++++++++++ .../source_okta/schemas/users.json | 229 +---------------- .../source-okta/source_okta/source.py | 52 +++- docs/integrations/sources/okta.md | 2 + 15 files changed, 336 insertions(+), 245 deletions(-) create mode 100644 airbyte-integrations/connectors/source-okta/sample_files/config.json create mode 100644 airbyte-integrations/connectors/source-okta/source_okta/schemas/group_members.json create mode 100644 airbyte-integrations/connectors/source-okta/source_okta/schemas/shared/users.json diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 8ac9c4e90549..42be81eecbce 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -613,7 +613,7 @@ - name: Okta sourceDefinitionId: 1d4fdb25-64fc-4569-92da-fcdca79a8372 dockerRepository: airbyte/source-okta - dockerImageTag: 0.1.4 + dockerImageTag: 0.1.5 documentationUrl: https://docs.airbyte.io/integrations/sources/okta icon: okta.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 8804b33e1ede..e528bcdd4fa9 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5907,7 +5907,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-okta:0.1.4" +- dockerImage: "airbyte/source-okta:0.1.5" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/okta" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-okta/Dockerfile b/airbyte-integrations/connectors/source-okta/Dockerfile index 50d5d9cb04a5..a53374fe8c0e 100644 --- a/airbyte-integrations/connectors/source-okta/Dockerfile +++ b/airbyte-integrations/connectors/source-okta/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-okta diff --git a/airbyte-integrations/connectors/source-okta/acceptance-test-config.yml b/airbyte-integrations/connectors/source-okta/acceptance-test-config.yml index 1e60fe03ef3a..722401bd4bf6 100644 --- a/airbyte-integrations/connectors/source-okta/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-okta/acceptance-test-config.yml @@ -20,5 +20,3 @@ tests: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" future_state_path: "integration_tests/abnormal_state.json" - cursor_paths: - users: ["lastUpdated"] diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json index 327d7b7b278a..18615c665427 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json @@ -1,4 +1,6 @@ { "users": { "lastUpdated": "3021-09-08T07:04:28.000Z" }, - "groups": { "lastUpdated": "3021-09-08T07:04:28.000Z" } + "groups": { "lastUpdated": "3021-09-08T07:04:28.000Z" }, + "group_members": { "id": "00uzzzzzzzzzzzzzzzzz" }, + "logs": { "published": "3021-09-08T07:04:28.000Z" } } diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/catalog.json b/airbyte-integrations/connectors/source-okta/integration_tests/catalog.json index f9452748f05e..98fee2fe3418 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/catalog.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/catalog.json @@ -29,6 +29,16 @@ "destination_sync_mode": "overwrite", "cursor_field": ["lastUpdated"], "primary_key": [["id"]] + }, + { + "stream": { + "name": "groupMembers", + "json_schema": {} + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite", + "cursor_field": ["id"], + "primary_key": [["id"]] } ] } diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json index 7c58f625727a..9aae0af390c3 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json @@ -6,7 +6,7 @@ "json_schema": {}, "supported_sync_modes": ["full_refresh", "incremental"] }, - "sync_mode": "full_refresh", + "sync_mode": "incremental", "destination_sync_mode": "overwrite", "cursor_field": ["lastUpdated"], "primary_key": [["id"]] @@ -32,6 +32,17 @@ "destination_sync_mode": "overwrite", "cursor_field": ["published"], "primary_key": [["uuid"]] + }, + { + "stream": { + "name": "group_members", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite", + "cursor_field": ["id"], + "primary_key": [["id"]] } ] } diff --git a/airbyte-integrations/connectors/source-okta/sample_files/catalog.json b/airbyte-integrations/connectors/source-okta/sample_files/catalog.json index f9452748f05e..c7793f364181 100644 --- a/airbyte-integrations/connectors/source-okta/sample_files/catalog.json +++ b/airbyte-integrations/connectors/source-okta/sample_files/catalog.json @@ -29,6 +29,17 @@ "destination_sync_mode": "overwrite", "cursor_field": ["lastUpdated"], "primary_key": [["id"]] + }, + { + "stream": { + "name": "group_members", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite", + "cursor_field": ["id"], + "primary_key": [["id"]] } ] } diff --git a/airbyte-integrations/connectors/source-okta/sample_files/config.json b/airbyte-integrations/connectors/source-okta/sample_files/config.json new file mode 100644 index 000000000000..454bc28225e8 --- /dev/null +++ b/airbyte-integrations/connectors/source-okta/sample_files/config.json @@ -0,0 +1,4 @@ +{ + "base_url": "https://myorg.okta.com", + "token": "xyz123foo325a.fbar" +} diff --git a/airbyte-integrations/connectors/source-okta/source_okta/schemas/group_members.json b/airbyte-integrations/connectors/source-okta/source_okta/schemas/group_members.json new file mode 100644 index 000000000000..d02387b101f1 --- /dev/null +++ b/airbyte-integrations/connectors/source-okta/source_okta/schemas/group_members.json @@ -0,0 +1,3 @@ +{ + "$ref": "users.json" +} diff --git a/airbyte-integrations/connectors/source-okta/source_okta/schemas/logs.json b/airbyte-integrations/connectors/source-okta/source_okta/schemas/logs.json index 5872b10c06eb..e0b67c1d8073 100644 --- a/airbyte-integrations/connectors/source-okta/source_okta/schemas/logs.json +++ b/airbyte-integrations/connectors/source-okta/source_okta/schemas/logs.json @@ -32,7 +32,8 @@ "LDAP", "FEDERATION", "SOCIAL", - "FACTOR_PROVIDER" + "FACTOR_PROVIDER", + null ], "type": ["string", "null"] }, @@ -47,7 +48,8 @@ "SYMANTEC", "GOOGLE", "DUO", - "YUBIKEY" + "YUBIKEY", + null ], "type": ["string", "null"] }, @@ -60,7 +62,8 @@ "IWA", "EMAIL", "OAUTH2", - "JWT" + "JWT", + null ], "type": ["string", "null"] }, @@ -149,7 +152,7 @@ "properties": { "debugData": { "additionalProperties": { - "type": ["object", "null"] + "type": ["object", "null", "string"] }, "type": ["object", "null"] } @@ -264,7 +267,7 @@ }, "detailEntry": { "additionalProperties": { - "type": ["object", "null"] + "type": ["object", "null", "string"] }, "type": ["object", "null"] }, @@ -286,7 +289,7 @@ "properties": { "detail": { "additionalProperties": { - "type": ["object", "null"] + "type": ["object", "null", "string"] }, "type": ["object", "null"] }, diff --git a/airbyte-integrations/connectors/source-okta/source_okta/schemas/shared/users.json b/airbyte-integrations/connectors/source-okta/source_okta/schemas/shared/users.json new file mode 100644 index 000000000000..eef92c8a7c4b --- /dev/null +++ b/airbyte-integrations/connectors/source-okta/source_okta/schemas/shared/users.json @@ -0,0 +1,230 @@ +{ + "properties": { + "_links": { + "additionalProperties": { + "type": ["object", "null"] + }, + "type": ["object", "null"] + }, + "activated": { + "format": "date-time", + "type": ["string", "null"] + }, + "created": { + "format": "date-time", + "type": "string" + }, + "credentials": { + "properties": { + "password": { + "properties": { + "hash": { + "properties": { + "algorithm": { + "enum": ["BCRYPT", "SHA-512", "SHA-256", "SHA-1", "MD5"], + "type": ["string", "null"] + }, + "salt": { + "type": ["string", "null"] + }, + "saltOrder": { + "type": ["string", "null"] + }, + "value": { + "type": ["string", "null"] + }, + "workFactor": { + "type": ["integer", "null"] + } + }, + "type": ["object", "null"] + }, + "hook": { + "properties": { + "type": { + "type": ["string", "null"] + } + }, + "type": ["object", "null"] + }, + "value": { + "format": "password", + "type": ["string", "null"] + } + }, + "type": ["object", "null"] + }, + "provider": { + "properties": { + "name": { + "type": ["string", "null"] + }, + "type": { + "enum": [ + "ACTIVE_DIRECTORY", + "FEDERATION", + "LDAP", + "OKTA", + "SOCIAL", + "IMPORT" + ], + "type": ["string", "null"] + } + }, + "type": ["object", "null"] + }, + "recovery_question": { + "properties": { + "answer": { + "type": ["string", "null"] + }, + "question": { + "type": ["string", "null"] + } + }, + "type": ["object", "null"] + } + }, + "type": "object" + }, + "id": { + "type": "string" + }, + "lastLogin": { + "format": "date-time", + "type": ["string", "null"] + }, + "lastUpdated": { + "format": "date-time", + "type": "string" + }, + "passwordChanged": { + "format": "date-time", + "type": ["string", "null"] + }, + "profile": { + "properties": { + "city": { + "type": ["string", "null"] + }, + "costCenter": { + "type": ["string", "null"] + }, + "countryCode": { + "type": ["string", "null"] + }, + "department": { + "type": ["string", "null"] + }, + "displayName": { + "type": ["string", "null"] + }, + "division": { + "type": ["string", "null"] + }, + "email": { + "type": ["string", "null"] + }, + "employeeNumber": { + "type": ["string", "null"] + }, + "firstName": { + "type": ["string", "null"] + }, + "honorificPrefix": { + "type": ["string", "null"] + }, + "honorificSuffix": { + "type": ["string", "null"] + }, + "lastName": { + "type": ["string", "null"] + }, + "locale": { + "type": ["string", "null"] + }, + "login": { + "type": ["string", "null"] + }, + "manager": { + "type": ["string", "null"] + }, + "managerId": { + "type": ["string", "null"] + }, + "middleName": { + "type": ["string", "null"] + }, + "mobilePhone": { + "type": ["string", "null"] + }, + "nickName": { + "type": ["string", "null"] + }, + "organization": { + "type": ["string", "null"] + }, + "postalAddress": { + "type": ["string", "null"] + }, + "preferredLanguage": { + "type": ["string", "null"] + }, + "primaryPhone": { + "type": ["string", "null"] + }, + "profileUrl": { + "type": ["string", "null"] + }, + "secondEmail": { + "type": ["string", "null"] + }, + "state": { + "type": ["string", "null"] + }, + "streetAddress": { + "type": ["string", "null"] + }, + "timezone": { + "type": ["string", "null"] + }, + "title": { + "type": ["string", "null"] + }, + "userType": { + "type": ["string", "null"] + }, + "zipCode": { + "type": ["string", "null"] + } + }, + "type": "object" + }, + "status": { + "enum": [ + "ACTIVE", + "DEPROVISIONED", + "LOCKED_OUT", + "PASSWORD_EXPIRED", + "PROVISIONED", + "RECOVERY", + "STAGED", + "SUSPENDED" + ], + "type": "string" + }, + "statusChanged": { + "format": "date-time", + "type": ["string", "null"] + }, + "type": { + "properties": { + "id": { + "type": ["string", "null"] + } + }, + "type": "object" + } + }, + "type": "object" +} diff --git a/airbyte-integrations/connectors/source-okta/source_okta/schemas/users.json b/airbyte-integrations/connectors/source-okta/source_okta/schemas/users.json index eef92c8a7c4b..d02387b101f1 100644 --- a/airbyte-integrations/connectors/source-okta/source_okta/schemas/users.json +++ b/airbyte-integrations/connectors/source-okta/source_okta/schemas/users.json @@ -1,230 +1,3 @@ { - "properties": { - "_links": { - "additionalProperties": { - "type": ["object", "null"] - }, - "type": ["object", "null"] - }, - "activated": { - "format": "date-time", - "type": ["string", "null"] - }, - "created": { - "format": "date-time", - "type": "string" - }, - "credentials": { - "properties": { - "password": { - "properties": { - "hash": { - "properties": { - "algorithm": { - "enum": ["BCRYPT", "SHA-512", "SHA-256", "SHA-1", "MD5"], - "type": ["string", "null"] - }, - "salt": { - "type": ["string", "null"] - }, - "saltOrder": { - "type": ["string", "null"] - }, - "value": { - "type": ["string", "null"] - }, - "workFactor": { - "type": ["integer", "null"] - } - }, - "type": ["object", "null"] - }, - "hook": { - "properties": { - "type": { - "type": ["string", "null"] - } - }, - "type": ["object", "null"] - }, - "value": { - "format": "password", - "type": ["string", "null"] - } - }, - "type": ["object", "null"] - }, - "provider": { - "properties": { - "name": { - "type": ["string", "null"] - }, - "type": { - "enum": [ - "ACTIVE_DIRECTORY", - "FEDERATION", - "LDAP", - "OKTA", - "SOCIAL", - "IMPORT" - ], - "type": ["string", "null"] - } - }, - "type": ["object", "null"] - }, - "recovery_question": { - "properties": { - "answer": { - "type": ["string", "null"] - }, - "question": { - "type": ["string", "null"] - } - }, - "type": ["object", "null"] - } - }, - "type": "object" - }, - "id": { - "type": "string" - }, - "lastLogin": { - "format": "date-time", - "type": ["string", "null"] - }, - "lastUpdated": { - "format": "date-time", - "type": "string" - }, - "passwordChanged": { - "format": "date-time", - "type": ["string", "null"] - }, - "profile": { - "properties": { - "city": { - "type": ["string", "null"] - }, - "costCenter": { - "type": ["string", "null"] - }, - "countryCode": { - "type": ["string", "null"] - }, - "department": { - "type": ["string", "null"] - }, - "displayName": { - "type": ["string", "null"] - }, - "division": { - "type": ["string", "null"] - }, - "email": { - "type": ["string", "null"] - }, - "employeeNumber": { - "type": ["string", "null"] - }, - "firstName": { - "type": ["string", "null"] - }, - "honorificPrefix": { - "type": ["string", "null"] - }, - "honorificSuffix": { - "type": ["string", "null"] - }, - "lastName": { - "type": ["string", "null"] - }, - "locale": { - "type": ["string", "null"] - }, - "login": { - "type": ["string", "null"] - }, - "manager": { - "type": ["string", "null"] - }, - "managerId": { - "type": ["string", "null"] - }, - "middleName": { - "type": ["string", "null"] - }, - "mobilePhone": { - "type": ["string", "null"] - }, - "nickName": { - "type": ["string", "null"] - }, - "organization": { - "type": ["string", "null"] - }, - "postalAddress": { - "type": ["string", "null"] - }, - "preferredLanguage": { - "type": ["string", "null"] - }, - "primaryPhone": { - "type": ["string", "null"] - }, - "profileUrl": { - "type": ["string", "null"] - }, - "secondEmail": { - "type": ["string", "null"] - }, - "state": { - "type": ["string", "null"] - }, - "streetAddress": { - "type": ["string", "null"] - }, - "timezone": { - "type": ["string", "null"] - }, - "title": { - "type": ["string", "null"] - }, - "userType": { - "type": ["string", "null"] - }, - "zipCode": { - "type": ["string", "null"] - } - }, - "type": "object" - }, - "status": { - "enum": [ - "ACTIVE", - "DEPROVISIONED", - "LOCKED_OUT", - "PASSWORD_EXPIRED", - "PROVISIONED", - "RECOVERY", - "STAGED", - "SUSPENDED" - ], - "type": "string" - }, - "statusChanged": { - "format": "date-time", - "type": ["string", "null"] - }, - "type": { - "properties": { - "id": { - "type": ["string", "null"] - } - }, - "type": "object" - } - }, - "type": "object" + "$ref": "users.json" } diff --git a/airbyte-integrations/connectors/source-okta/source_okta/source.py b/airbyte-integrations/connectors/source-okta/source_okta/source.py index 97c97bec379c..1e9dc09d615c 100644 --- a/airbyte-integrations/connectors/source-okta/source_okta/source.py +++ b/airbyte-integrations/connectors/source-okta/source_okta/source.py @@ -9,6 +9,7 @@ import pendulum import requests +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream @@ -112,6 +113,42 @@ def path(self, **kwargs) -> str: return "groups" +class GroupMembers(IncrementalOktaStream): + cursor_field = "id" + primary_key = "id" + min_user_id = "00u00000000000000000" + use_cache = True + + def stream_slices(self, **kwargs): + group_stream = Groups(authenticator=self.authenticator, url_base=self.url_base) + for group in group_stream.read_records(sync_mode=SyncMode.full_refresh): + yield {"group_id": group["id"]} + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: + group_id = stream_slice["group_id"] + return f"groups/{group_id}/users" + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + params = OktaStream.request_params(self, stream_state, stream_slice, next_page_token) + latest_entry = stream_state.get(self.cursor_field) + if latest_entry: + params["after"] = latest_entry + return params + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + return { + self.cursor_field: max( + latest_record.get(self.cursor_field, self.min_user_id), + current_stream_state.get(self.cursor_field, self.min_user_id), + ) + } + + class Logs(IncrementalOktaStream): cursor_field = "published" @@ -129,13 +166,19 @@ def request_params( # The log stream use a different params to get data # https://developer.okta.com/docs/reference/api/system-log/#datetime-filter stream_state = stream_state or {} - params = { - "limit": self.page_size, - **(next_page_token or {}), - } + params = OktaStream.request_params(self, stream_state, stream_slice, next_page_token) latest_entry = stream_state.get(self.cursor_field) if latest_entry: params["since"] = latest_entry + # [Test-driven Development] Set until When the cursor value from the stream state + # is abnormally large, otherwise the server side that sets now to until + # will throw an error: The "until" date must be later than the "since" date + # https://developer.okta.com/docs/reference/api/system-log/#request-parameters + parsed = pendulum.parse(latest_entry) + utc_now = pendulum.utcnow() + if parsed > utc_now: + params["until"] = latest_entry + return params @@ -186,4 +229,5 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: Groups(**initialization_params), Logs(**initialization_params), Users(**initialization_params), + GroupMembers(**initialization_params), ] diff --git a/docs/integrations/sources/okta.md b/docs/integrations/sources/okta.md index 1f3dbc64be9b..a2bfb943598d 100644 --- a/docs/integrations/sources/okta.md +++ b/docs/integrations/sources/okta.md @@ -10,6 +10,7 @@ This Source is capable of syncing the following core Streams: * [Users](https://developer.okta.com/docs/reference/api/users/#list-users) * [Groups](https://developer.okta.com/docs/reference/api/groups/#list-groups) +* [Group Members](https://developer.okta.com/docs/reference/api/groups/#list-group-members) * [System Log](https://developer.okta.com/docs/reference/api/system-log/#get-started) ### Data type mapping @@ -61,6 +62,7 @@ Different Okta APIs require different admin privilege levels. API tokens inherit | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.5 | 2022-07-04 | [14380](https://github.com/airbytehq/airbyte/pull/14380) | add Group_Members stream to okta source | | 0.1.4 | 2021-11-02 | [7584](https://github.com/airbytehq/airbyte/pull/7584) | Fix incremental params for log stream | | 0.1.3 | 2021-09-08 | [5905](https://github.com/airbytehq/airbyte/pull/5905) | Fix incremental stream defect | | 0.1.2 | 2021-07-01 | [4456](https://github.com/airbytehq/airbyte/pull/4456) | Bugfix infinite pagination in logs stream |