Skip to content

Commit

Permalink
Source Metabase: migrate to Beta YAML (#19236)
Browse files Browse the repository at this point in the history
* migrated to YAMl

* added changelog

* added unit tests, refactor source, fixed ecpected records

* fixed working with session token

* added creating new session token if user passed invalid

* removed custom auth in components, now it's in cdk

* added config for auth, deleted unused tests and dependancies

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 13, 2022
1 parent 346fc75 commit 469199a
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -935,11 +935,11 @@
- name: Metabase
sourceDefinitionId: c7cb421b-942e-4468-99ee-e369bcabaec5
dockerRepository: airbyte/source-metabase
dockerImageTag: 0.2.0
dockerImageTag: 0.3.0
documentationUrl: https://docs.airbyte.com/integrations/sources/metabase
icon: metabase.svg
sourceType: api
releaseStage: alpha
releaseStage: beta
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7831,7 +7831,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-metabase:0.2.0"
- dockerImage: "airbyte/source-metabase:0.3.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/metabase"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-metabase/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/source-metabase
Original file line number Diff line number Diff line change
@@ -1,20 +1,37 @@
connector_image: airbyte/source-metabase:dev
tests:
acceptance_tests:
spec:
- spec_path: "source_metabase/spec.yaml"
backward_compatibility_tests_config:
disable_for_version: "0.1.0"
tests:
- spec_path: "source_metabase/spec.yaml"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "integration_tests/config_http_url.json"
status: "failed"
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "integration_tests/config_http_url.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
disable_for_version: "0.1.0"
tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams:
- name: activity
bypass_reason: "data changes very fast"
- name: cards
bypass_reason: "data changes very fast"
- name: collections
bypass_reason: "data changes very fast"
- name: dashboards
bypass_reason: "data changes very fast"
- name: users
bypass_reason: "data changes very fast"
full_refresh:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
incremental:
bypass_reason: "This connector does not implement incremental sync"
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-metabase/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "requests>=2.28.0", "types-requests>=2.27.30"]
MAIN_REQUIREMENTS = ["airbyte-cdk", "requests>=2.28.0", "types-requests>=2.27.30"]

TEST_REQUIREMENTS = [
"pytest~=6.1",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
version: "0.3.0"

definitions:
selector:
extractor:
field_pointer: [ ]
data_field_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_pointer: [ "data" ]
requester:
url_base: "{{ config['instance_api_url'] }}"
http_method: "GET"
authenticator:
type: "SessionTokenAuthenticator"
username: "{{ config['username'] }}"
password: "{{ config['password'] }}"
api_url: "{{ config['instance_api_url'] }}"
header: "X-Metabase-Session"
session_token: "{{ config['session_token'] }}"
session_token_response_key: "id"
login_url: "session"
validate_session_url: "user/current"
retriever:
record_selector:
$ref: "*ref(definitions.selector)"
paginator:
type: NoPagination
requester:
$ref: "*ref(definitions.requester)"
data_field_retriever:
record_selector:
$ref: "*ref(definitions.data_field_selector)"
paginator:
type: NoPagination
requester:
$ref: "*ref(definitions.requester)"
base_stream:
primary_key: "id"
retriever:
$ref: "*ref(definitions.retriever)"
activity_stream:
$ref: "*ref(definitions.base_stream)"
$options:
name: "activity"
path: "activity"
cards_stream:
$ref: "*ref(definitions.base_stream)"
$options:
name: "cards"
path: "card"
collections_stream:
$ref: "*ref(definitions.base_stream)"
$options:
name: "collections"
path: "collection"
dashboards_stream:
$ref: "*ref(definitions.base_stream)"
$options:
name: "dashboards"
path: "dashboard"
users_stream:
primary_key: "id"
retriever:
$ref: "*ref(definitions.data_field_retriever)"
$options:
name: "users"
path: "user"
streams:
- "*ref(definitions.activity_stream)"
- "*ref(definitions.cards_stream)"
- "*ref(definitions.collections_stream)"
- "*ref(definitions.dashboards_stream)"
- "*ref(definitions.users_stream)"

check:
stream_names:
- "activity"
Original file line number Diff line number Diff line change
Expand Up @@ -2,132 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple

import requests
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator
from source_metabase.streams import Activity, Cards, Collections, Dashboards, Users
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource

API_URL = "instance_api_url"
USERNAME = "username"
PASSWORD = "password"
SESSION_TOKEN = "session_token"


class MetabaseAuth(HttpAuthenticator):
def __init__(self, logger: logging.Logger, config: Mapping[str, Any]):
self.need_session_close = False
self.session_token = ""
self.logger = logger
self.api_url = config[API_URL]
if USERNAME in config and PASSWORD in config:
self.username = config[USERNAME]
self.password = config[PASSWORD]
if SESSION_TOKEN in config:
self.session_token = config[SESSION_TOKEN]
elif USERNAME in config and PASSWORD in config:
self.session_token = self.get_new_session_token(config[USERNAME], config[PASSWORD])
else:
raise KeyError("Required parameters (username/password pair or session_token) not found")
# TODO: Try to retrieve latest session_token stored in some state message?

def get_new_session_token(self, username: str, password: str) -> str:
response = requests.post(
f"{self.api_url}session", headers={"Content-Type": "application/json"}, json={"username": username, "password": password}
)
response.raise_for_status()
if response.ok:
self.session_token = response.json()["id"]
self.need_session_close = True
self.logger.info(f"New session token generated for {username}")
else:
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}")
return self.session_token

def has_valid_token(self) -> bool:
try:
response = requests.get(f"{self.api_url}user/current", headers=self.get_auth_header())
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
self.logger.warn(f"Unable to connect to Metabase source due to {str(e)}, retrying with a new session_token...")
self.get_new_session_token(self.username, self.password)
response = requests.get(f"{self.api_url}user/current", headers=self.get_auth_header())
response.raise_for_status()
else:
raise ConnectionError(f"Error while checking connection: {e}")
if response.ok:
json_response = response.json()
self.logger.info(
f"Connection check for Metabase successful for {json_response['common_name']} login at {json_response['last_login']}"
)
return True
else:
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}")

def get_auth_header(self) -> Mapping[str, Any]:
return {"X-Metabase-Session": self.session_token}

def close_session(self):
if self.need_session_close:
response = requests.delete(
f"{self.api_url}session", headers=self.get_auth_header(), json={"metabase-session-id": self.session_token}
)
response.raise_for_status()
if response.ok:
self.logger.info("Session successfully closed")
else:
self.logger.info(f"Unable to close session {response.status_code}: {response.reason}")
else:
self.logger.info("Session was not opened by this connector.")


class SourceMetabase(AbstractSource):
class SourceMetabase(YamlDeclarativeSource):
def __init__(self):
self.session = None

def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
session = None
try:
session = MetabaseAuth(logger, config)
return session.has_valid_token(), None
except Exception as e:
return False, e
finally:
if session:
session.close_session()

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self.session = MetabaseAuth(logging.getLogger("airbyte"), config)
if not self.session.has_valid_token():
raise ConnectionError("Failed to connect to source")
args = {"authenticator": self.session, API_URL: config[API_URL]}
return [
Activity(**args),
Cards(**args),
Collections(**args),
Dashboards(**args),
Users(**args),
]

# We override the read method to make sure we close the metabase session and logout
# so we don't keep too many active session_token active.
def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: MutableMapping[str, Any] = None,
) -> Iterator[AirbyteMessage]:
try:
yield from super().read(logger, config, catalog, state)
finally:
self.close_session()

def close_session(self):
if self.session:
self.session.close_session()
super().__init__(**{"path_to_yaml": "metabase.yaml"})
Loading

0 comments on commit 469199a

Please sign in to comment.