Skip to content

Commit

Permalink
🎉 Source Slack: Implement OAuth support with OAuth authenticator (#6570)
Browse files Browse the repository at this point in the history
* Source Slack: Implement OAuth support with OAuth authenticator
  • Loading branch information
yevhenii-ldv authored Oct 7, 2021
1 parent 873fc06 commit 00d56e7
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ jobs:
SALESFORCE_INTEGRATION_TESTS_CREDS: ${{ secrets.SALESFORCE_INTEGRATION_TESTS_CREDS }}
SENDGRID_INTEGRATION_TEST_CREDS: ${{ secrets.SENDGRID_INTEGRATION_TEST_CREDS }}
SHOPIFY_INTEGRATION_TEST_CREDS: ${{ secrets.SHOPIFY_INTEGRATION_TEST_CREDS }}
SLACK_TEST_CREDS: ${{ secrets.SLACK_TEST_CREDS }}
SOURCE_ASANA_TEST_CREDS: ${{ secrets.SOURCE_ASANA_TEST_CREDS }}
SOURCE_OKTA_TEST_CREDS: ${{ secrets.SOURCE_OKTA_TEST_CREDS }}
SOURCE_SLACK_TEST_CREDS: ${{ secrets.SOURCE_SLACK_TEST_CREDS }}
SOURCE_SLACK_OAUTH_TEST_CREDS: ${{ secrets.SOURCE_SLACK_OAUTH_TEST_CREDS }}
SOURCE_US_CENSUS_TEST_CREDS: ${{ secrets.SOURCE_US_CENSUS_TEST_CREDS }}
SMARTSHEETS_TEST_CREDS: ${{ secrets.SMARTSHEETS_TEST_CREDS }}
SOURCE_SNAPCHAT_MARKETING_CREDS: ${{ secrets.SOURCE_SNAPCHAT_MARKETING_CREDS }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ jobs:
SALESFORCE_INTEGRATION_TESTS_CREDS: ${{ secrets.SALESFORCE_INTEGRATION_TESTS_CREDS }}
SENDGRID_INTEGRATION_TEST_CREDS: ${{ secrets.SENDGRID_INTEGRATION_TEST_CREDS }}
SHOPIFY_INTEGRATION_TEST_CREDS: ${{ secrets.SHOPIFY_INTEGRATION_TEST_CREDS }}
SLACK_TEST_CREDS: ${{ secrets.SLACK_TEST_CREDS }}
SOURCE_OKTA_TEST_CREDS: ${{ secrets.SOURCE_OKTA_TEST_CREDS }}
SOURCE_SLACK_TEST_CREDS: ${{ secrets.SOURCE_SLACK_TEST_CREDS }}
SOURCE_SLACK_OAUTH_TEST_CREDS: ${{ secrets.SOURCE_SLACK_OAUTH_TEST_CREDS }}
SOURCE_US_CENSUS_TEST_CREDS: ${{ secrets.SOURCE_US_CENSUS_TEST_CREDS }}
SMARTSHEETS_TEST_CREDS: ${{ secrets.SMARTSHEETS_TEST_CREDS }}
SOURCE_SNAPCHAT_MARKETING_CREDS: ${{ secrets.SOURCE_SNAPCHAT_MARKETING_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "c2281cee-86f9-4a86-bb48-d23286b4c7bd",
"name": "Slack",
"dockerRepository": "airbyte/source-slack",
"dockerImageTag": "0.1.11",
"dockerImageTag": "0.1.12",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/slack",
"icon": "slack.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@
- sourceDefinitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd
name: Slack
dockerRepository: airbyte/source-slack
dockerImageTag: 0.1.11
dockerImageTag: 0.1.12
documentationUrl: https://docs.airbyte.io/integrations/sources/slack
icon: slack.svg
sourceType: api
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-slack/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.name=airbyte/source-slack
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ tests:
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "secrets/config_oauth.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "integration_tests/invalid_oauth_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
basic_read:
Expand All @@ -17,9 +21,6 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 7200
cursor_paths:
channel_messages: ["float_ts"]
threads: ["float_ts"]
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/full_refresh_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{
"api_token": "invalid_api_token",
"credentials": {
"option_title": "API Token Credentials",
"api_token": "fake-api-token"
},
"start_date": "2022-07-22T20:00:00Z",
"lookback_window": 2,
"join_channels": true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"credentials": {
"option_title": "Default OAuth2.0 authorization",
"client_id": "fake-client-id",
"client_secret": "fake-client-secret",
"refresh_token": "fake-refresh-token",
"access_token": "fake-access-token"
},
"start_date": "2021-07-22T20:00:00Z",
"lookback_window": 2,
"join_channels": true
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-slack/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=["airbyte-cdk~=0.1.13", "slack_sdk==3.4.2", "pendulum>=2,<3"],
install_requires=["airbyte-cdk~=0.1", "pendulum>=2,<3"],
package_data={"": ["*.json"]},
extras_require={
"tests": TEST_REQUIREMENTS,
Expand Down
50 changes: 37 additions & 13 deletions airbyte-integrations/connectors/source-slack/source_slack/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
from pendulum import DateTime, Period
from slack_sdk import WebClient


class SlackStream(HttpStream, ABC):
Expand Down Expand Up @@ -107,7 +106,7 @@ def parse_response(self, response: requests.Response, stream_slice: Mapping[str,
yield {"member_id": member_id, "channel_id": stream_slice["channel_id"]}

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
channels_stream = Channels(authenticator=self.authenticator)
channels_stream = Channels(authenticator=self._session.auth)
for channel_record in channels_stream.read_records(sync_mode=SyncMode.full_refresh):
yield {"channel_id": channel_record["id"]}

Expand Down Expand Up @@ -188,7 +187,7 @@ def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwarg
yield from super().read_records(stream_slice=stream_slice, **kwargs)
else:
# if channel is not provided, then get channels and read accordingly
channels = Channels(authenticator=self.authenticator)
channels = Channels(authenticator=self._session.auth)
for channel_record in channels.read_records(sync_mode=SyncMode.full_refresh):
stream_slice["channel"] = channel_record["id"]
yield from super().read_records(stream_slice=stream_slice, **kwargs)
Expand Down Expand Up @@ -221,7 +220,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
"""

stream_state = stream_state or {}
channels_stream = Channels(authenticator=self.authenticator)
channels_stream = Channels(authenticator=self._session.auth)

if self.cursor_field in stream_state:
# Since new messages can be posted to threads continuously after the parent message has been posted, we get messages from the latest date
Expand All @@ -232,7 +231,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
# If there is no state i.e: this is the first sync then there is no use for lookback, just get messages from the default start date
messages_start_date = pendulum.from_timestamp(self._start_ts)

messages_stream = ChannelMessages(authenticator=self.authenticator, default_start_date=messages_start_date)
messages_stream = ChannelMessages(authenticator=self._session.auth, default_start_date=messages_start_date)

for message_chunk in messages_stream.stream_slices(stream_state={self.cursor_field: messages_start_date.timestamp()}):
self.logger.info(f"Syncing replies {message_chunk}")
Expand Down Expand Up @@ -276,7 +275,7 @@ def path(self, **kwargs) -> str:
return "conversations.join"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
channels_stream = Channels(authenticator=self.authenticator)
channels_stream = Channels(authenticator=self._session.auth)
for channel in channels_stream.read_records(sync_mode=SyncMode.full_refresh):
yield {"channel": channel["id"], "channel_name": channel["name"]}

Expand All @@ -285,16 +284,41 @@ def request_body_json(self, stream_slice: Mapping = None, **kwargs) -> Optional[


class SourceSlack(AbstractSource):
def _get_authenticator(self, config: Mapping[str, Any]):
# Added to maintain backward compatibility with previous versions
if "api_token" in config:
return TokenAuthenticator(config["api_token"])

credentials = config.get("credentials")
credentials_title = credentials.get("option_title")
if credentials_title == "Default OAuth2.0 authorization":
# We can get `refresh_token` only if the token rotation function is enabled for the Slack Oauth Application.
# If it is disabled, then we use the generated `access_token`, which acts without expiration.
# https://api.slack.com/authentication/rotation
if credentials.get("refresh_token", "").strip():
return Oauth2Authenticator(
token_refresh_endpoint="https://slack.com/api/oauth.v2.access",
client_id=credentials["client_id"],
client_secret=credentials["client_secret"],
refresh_token=credentials["refresh_token"],
)
return TokenAuthenticator(credentials["access_token"])
elif credentials_title == "API Token Credentials":
return TokenAuthenticator(credentials["api_token"])
else:
raise Exception(f"No supported option_title: {credentials_title} specified. See spec.json for references")

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
slack_client = WebClient(token=config["api_token"])
users = slack_client.users_list(limit=1).get("members", [])
if len(users) > 0:
try:
authenticator = self._get_authenticator(config)
users_stream = Users(authenticator=authenticator)
next(users_stream.read_records(SyncMode.full_refresh))
return True, None
else:
return False, "There are no users in the given Slack instance"
except Exception:
return False, "There are no users in the given Slack instance or your token is incorrect"

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = TokenAuthenticator(config["api_token"])
authenticator = self._get_authenticator(config)
default_start_date = pendulum.parse(config["start_date"])
threads_lookback_window = pendulum.Duration(days=config["lookback_window"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,15 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Slack Spec",
"type": "object",
"required": ["api_token", "start_date", "lookback_window", "join_channels"],
"additionalProperties": false,
"required": ["start_date", "lookback_window", "join_channels"],
"additionalProperties": true,
"properties": {
"api_token": {
"type": "string",
"title": "API Token",
"description": "A slack bot token. See the <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> for instructions on how to generate it.",
"airbyte_secret": true
},
"start_date": {
"type": "string",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated.",
"examples": ["2017-01-25T00:00:00Z"],
"title": "Start date"
"title": "Start Date"
},
"lookback_window": {
"type": "integer",
Expand All @@ -31,7 +25,84 @@
"default": true,
"title": "Join all channels",
"description": "Whether to join all channels or to sync data only from channels the bot is already in. If false, you'll need to manually add the bot to all the channels from which you'd like to sync messages. "
},
"credentials": {
"title": "Authentication mechanism",
"description": "Choose how to authenticate into Slack",
"type": "object",
"oneOf": [
{
"type": "object",
"title": "Sign in via Slack (OAuth)",
"required": [
"access_token",
"client_id",
"client_secret",
"option_title"
],
"properties": {
"option_title": {
"type": "string",
"const": "Default OAuth2.0 authorization"
},
"client_id": {
"title": "Client ID",
"description": "Slack client_id. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help finding this id.",
"type": "string",
"examples": ["slack-client-id-example"]
},
"client_secret": {
"title": "Client Secret",
"description": "Slack client_secret. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help finding this secret.",
"type": "string",
"examples": ["slack-client-secret-example"],
"airbyte_secret": true
},
"access_token": {
"title": "Access token",
"description": "Slack access_token. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help generating the token.",
"type": "string",
"examples": ["slack-access-token-example"],
"airbyte_secret": true
},
"refresh_token": {
"title": "Refresh token",
"description": "Slack refresh_token. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help generating the token.",
"type": "string",
"examples": ["slack-refresh-token-example"],
"airbyte_secret": true
}
},
"order": 0
},
{
"type": "object",
"title": "API Token",
"required": ["api_token", "option_title"],
"properties": {
"option_title": {
"type": "string",
"const": "API Token Credentials"
},
"api_token": {
"type": "string",
"title": "API Token",
"description": "A Slack bot token. See the <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> for instructions on how to generate it.",
"airbyte_secret": true
}
},
"order": 1
}
]
}
}
},
"authSpecification": {
"auth_type": "oauth2.0",
"oauth2Specification": {
"rootObject": ["credentials", 0],
"oauthFlowInitParameters": [["client_id"], ["client_secret"]],
"oauthFlowOutputParameters": [["access_token"], ["refresh_token"]]
}
}
}
13 changes: 11 additions & 2 deletions docs/integrations/sources/slack.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

This source can sync data for the [Slack API](https://api.slack.com/). It supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run.

This Source Connector is based on a [Singer Tap](https://github.com/airbytehq/tap-slack).

### Output schema

This Source is capable of syncing the following core Streams:
Expand Down Expand Up @@ -46,6 +44,16 @@ The Slack connector should not run into Slack API limitations under normal usage

### Requirements

#### Slack connector can be connected using two types of authentication: OAuth2.0 or API Token

#### Using OAuth2.0 authenticator
* Client ID - issued when you created your app
* Client Secret - issued when you created your app
* Refresh Token - a special kind of token used to obtain a renewed access token

You can get more detailed information about this type of authentication by reading [Slack's documentation about OAuth2.0](https://api.slack.com/authentication/oauth-v2).

#### Using API Token
* Slack API Token

### Setup guide
Expand Down Expand Up @@ -101,6 +109,7 @@ We recommend creating a restricted, read-only key specifically for Airbyte acces

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.12 | 2021-10-07 | [6570](https://github.com/airbytehq/airbyte/pull/6570) | Implement OAuth support with OAuth authenticator |
| 0.1.11 | 2021-08-27 | [5830](https://github.com/airbytehq/airbyte/pull/5830) | Fixed sync operations hang forever issue |
| 0.1.10 | 2021-08-27 | [5697](https://github.com/airbytehq/airbyte/pull/5697) | Fixed max retries issue |
| 0.1.9 | 2021-07-20 | [4860](https://github.com/airbytehq/airbyte/pull/4860) | Fixed reading threads issue |
Expand Down
1 change: 1 addition & 0 deletions tools/bin/ci_credentials.sh
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ write_standard_creds source-sendgrid "$SENDGRID_INTEGRATION_TEST_CREDS"
write_standard_creds source-shopify "$SHOPIFY_INTEGRATION_TEST_CREDS"
write_standard_creds source-shortio "$SOURCE_SHORTIO_TEST_CREDS"
write_standard_creds source-slack "$SOURCE_SLACK_TEST_CREDS"
write_standard_creds source-slack "$SOURCE_SLACK_OAUTH_TEST_CREDS" "config_oauth.json"
write_standard_creds source-smartsheets "$SMARTSHEETS_TEST_CREDS"
write_standard_creds source-snapchat-marketing "$SOURCE_SNAPCHAT_MARKETING_CREDS"
write_standard_creds source-snowflake "$SNOWFLAKE_INTEGRATION_TEST_CREDS" "config.json"
Expand Down

0 comments on commit 00d56e7

Please sign in to comment.