-
Notifications
You must be signed in to change notification settings - Fork 4.1k
/
source.py
54 lines (42 loc) · 2.03 KB
/
source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import json
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import Any, Dict, Generic, Iterable, Mapping, MutableMapping, TypeVar
from airbyte_cdk.connector import BaseConnector, DefaultConnectorMixin, TConfig
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog
TState = TypeVar("TState")
TCatalog = TypeVar("TCatalog")
class BaseSource(BaseConnector[TConfig], ABC, Generic[TConfig, TState, TCatalog]):
@abstractmethod
def read_state(self, state_path: str) -> TState:
...
@abstractmethod
def read_catalog(self, catalog_path: str) -> TCatalog:
...
@abstractmethod
def read(self, logger: logging.Logger, config: TConfig, catalog: TCatalog, state: TState = None) -> Iterable[AirbyteMessage]:
"""
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
"""
@abstractmethod
def discover(self, logger: logging.Logger, config: TConfig) -> AirbyteCatalog:
"""
Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a
Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
"""
class Source(DefaultConnectorMixin, BaseSource[Mapping[str, Any], MutableMapping[str, Any], ConfiguredAirbyteCatalog], ABC):
# can be overridden to change an input state
def read_state(self, state_path: str) -> Dict[str, Any]:
if state_path:
state_obj = json.loads(open(state_path, "r").read())
else:
state_obj = {}
state = defaultdict(dict, state_obj)
return state
# can be overridden to change an input catalog
def read_catalog(self, catalog_path: str) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path))