Skip to content

Commit

Permalink
Remove custom fields (#4)
Browse files Browse the repository at this point in the history
* Remove custom fields from source.py

* Remove custom fields from spec.yaml

* Collections that support incremental sync are found correctly

* Run formatter

* Index values and termins are verified

* Stripped additional_columns from collection config and check()

* We now search for an index at the start of each sync

* Add default for missing data in collection

* Add a log message about the index chosen to sync an incremental stream

* Add an example for a configured incremental catalog

* Check test now validates the simplified check function

* Remove collection name from spec.yaml and CollectionConfig

* Update test_util.py to ahere to the new config

* Update the first discover test to validate that we can find indexes correctly

* Remove other discover tests, as they no longer apply

* Full refresh test now works with simplified expanded columns

* Remove unused imports

* Incremental test now adheres to the find_index_for_stream system

* Database test passes, so now all unit tests pass again

* Remove extra fields from required section

* ttl is nullable

* Data defaults to an empty object

* Update tests to reflect ttl and data select changes

* Fix expected records. All unit tests and acceptance tests pass

* Cleanup docs for find_index_for_stream

* Update setup guide to reflect multiple collections
  • Loading branch information
macmv authored and marcosmarxm committed Sep 29, 2022
1 parent 009be0f commit da7a7a3
Show file tree
Hide file tree
Showing 12 changed files with 360 additions and 836 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"streams": [
{
"stream": {
"name": "foo",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": false,
"default_cursor_field": ["column_name"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{ "stream": "deletions-data", "emitted_at": "1", "data": { "ref": "338836293305763911", "ts": 1659398359360000, "deleted_at": "2022-08-01T23:59:19.360000" } }
{ "stream": "deletions-data", "emitted_at": "2", "data": { "ref": "338836293305761863", "ts": 1659398366330000, "deleted_at": "2022-08-01T23:59:26.330000" } }
{ "stream": "deletions-data", "emitted_at": "3", "data": { "ref": "338836293305765959", "ts": 1659398371330000, "deleted_at": "2022-08-01T23:59:31.330000" } }
{ "stream": "deletions-data", "emitted_at": "5", "data": { "ref": "338836293305762887", "ts": 1659398320430000, "data": { "a": 6, "nested": { "value": 20 } } } }
{ "stream": "deletions-data", "emitted_at": "7", "data": { "ref": "338836293305764935", "ts": 1659398320430000, "data": { "a": 8, "nested": { "value": 30 } } } }
{ "stream": "deletions-data", "emitted_at": "5", "data": { "ref": "338836293305762887", "ts": 1659398320430000, "data": { "a": 6, "nested": { "value": 20 } }, "ttl": null } }
{ "stream": "deletions-data", "emitted_at": "7", "data": { "ref": "338836293305764935", "ts": 1659398320430000, "data": { "a": 8, "nested": { "value": 30 } }, "ttl": null } }
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{ "stream": "sample-data", "emitted_at": "1", "data": { "ref": "337567897171787849", "ts": 1658188683585000, "data": { "a": 5, "nested": { "value": 15 } }, "my_a_col": 5, "nested_column": 15, "optional_name": null } }
{ "stream": "sample-data", "emitted_at": "2", "data": { "ref": "337567897172836425", "ts": 1658271973660000, "data": { "a": 6, "nested": { "value": 20 }, "name": "hello world" }, "my_a_col": 6, "nested_column": 20, "optional_name": "hello world" } }
{ "stream": "sample-data", "emitted_at": "3", "data": { "ref": "337567897172837449", "ts": 1658188683585000, "data": { "a": 7, "nested": { "value": 25 } }, "my_a_col": 7, "nested_column": 25, "optional_name": null } }
{ "stream": "sample-data", "emitted_at": "4", "data": { "ref": "337567897173885001", "ts": 1658188683585000, "data": { "a": 8, "nested": { "value": 30 } }, "my_a_col": 8, "nested_column": 30, "optional_name": null } }
{ "stream": "sample-data", "emitted_at": "1", "data": { "ref": "337567897171787849", "ts": 1658188683585000, "data": { "a": 5, "nested": { "value": 15 } }, "ttl": null } }
{ "stream": "sample-data", "emitted_at": "2", "data": { "ref": "337567897172836425", "ts": 1658271973660000, "data": { "a": 6, "nested": { "value": 20 }, "name": "hello world" }, "ttl": null } }
{ "stream": "sample-data", "emitted_at": "3", "data": { "ref": "337567897172837449", "ts": 1658188683585000, "data": { "a": 7, "nested": { "value": 25 } }, "ttl": null } }
{ "stream": "sample-data", "emitted_at": "4", "data": { "ref": "337567897173885001", "ts": 1658188683585000, "data": { "a": 8, "nested": { "value": 30 } }, "ttl": null } }
236 changes: 115 additions & 121 deletions airbyte-integrations/connectors/source-fauna/source_fauna/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,38 +47,13 @@ def __init__(self, conf):

class CollectionConfig:
def __init__(self, conf):
# Name of the collection we are reading from.
self.name = conf["name"]
# true or false; do we add a `data` column that mirrors all the data in each document?
self.data_column = conf["data_column"]
# Any additional columns the user wants.
self.additional_columns = [Column(x) for x in conf.get("additional_columns", [])]
# The page size, used in all Paginate() calls.
self.page_size = conf["page_size"]
# Index name used in read_updates. Default to empty string
self.index = conf.get("index", "")

# Configs for how deletions are handled
self.deletions = DeletionsConfig(conf["deletions"])


class Column:
def __init__(self, conf):
# The name of this column. This is the name that will appear in the destination.
self.name = conf["name"]
# The path of the value within fauna. This is an array of strings.
self.path = conf["path"]
# The type of the value used in Airbyte. This will be used by most destinations
# as the column type. This is not validated at all!
self.type = conf["type"]
# If true, then the path above must exist in every document.
self.required = conf["required"]
# The format and airbyte_type are extra typing fields. Documentation:
# https://docs.airbyte.com/understanding-airbyte/supported-data-types/
self.format = conf.get("format")
self.airbyte_type = conf.get("airbyte_type")


class DeletionsConfig:
def __init__(self, conf):
self.mode = conf["deletion_mode"]
Expand All @@ -101,25 +76,9 @@ def expand_column_query(conf: CollectionConfig, value):
obj = {
"ref": q.select(["ref", "id"], doc),
"ts": q.select("ts", doc),
"data": q.select("data", doc, {}),
"ttl": q.select("ttl", doc, None),
}
if conf.data_column:
obj["data"] = q.select("data", doc)
for column in conf.additional_columns:
if column.required:
obj[column.name] = q.select(
column.path,
doc,
q.abort(
q.format(
f"The path {column.path} does not exist in document Ref(%s, collection=%s)",
q.select(["ref", "id"], doc),
q.select(["ref", "collection", "id"], doc),
)
),
)
else:
# If not required, default to None
obj[column.name] = q.select(column.path, doc, None)
return q.let(
{"document": q.get(value)},
obj,
Expand Down Expand Up @@ -158,23 +117,7 @@ def fail(message: str) -> AirbyteConnectionStatus:
try:
self._setup_client(config)

# STEP 1: Validate as much as we can before connecting to the database

# Collection config, which has all collection-specific config fields.
conf = config.collection

# Make sure they didn't choose an duplicate or invalid column names.
column_names = {}
for column in conf.additional_columns:
# We never allow a custom `data` column, as they might want to enable the
# data column later.
if column.name == "data" or column.name == "ref" or column.name == "ts":
return fail(f"Additional column cannot have reserved name '{column.name}'")
if column.name in column_names:
return fail(f"Additional column cannot have duplicate name '{column.name}'")
column_names[column.name] = ()

# STEP 2: Validate everything else after making sure the database is up.
# Validate everything else after making sure the database is up.
try:
self.client.query(q.now())
except Exception as e:
Expand All @@ -183,18 +126,15 @@ def fail(message: str) -> AirbyteConnectionStatus:
else:
return fail(f"Failed to connect to database: {e}")

# Validate the collection exists
collection = conf.name
# Validate our permissions
try:
self.client.query(q.paginate(q.documents(q.collection(collection)), size=1))
self.client.query(q.paginate(q.collections()))
except FaunaError:
return fail(f"Collection '{collection}' does not exist")

# If they entered an index, make sure it's correct
if conf.index != "":
res = self._validate_index(conf.name, conf.index)
if res is not None:
return fail(res)
return fail("No permissions to list collections")
try:
self.client.query(q.paginate(q.indexes()))
except FaunaError:
return fail("No permissions to list indexes")

return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
Expand Down Expand Up @@ -227,6 +167,41 @@ def _validate_index(self, collection: str, index: str) -> Optional[str]:
# All above checks passed, so it's valid.
return None

def find_index_for_stream(self, collection: str) -> str:
"""
Finds the index for the given collection name. This will iterate over all indexes, and find
one that has the correct source, values, and terms.
:param collection: The name of the collection to search for.
"""
page = self.client.query(q.paginate(q.indexes()))
while True:
for id in page["data"]:
try:
index = self.client.query(q.get(id))
except Unauthorized:
# If we don't have permissions to read this index, we ignore it.
continue
source = index["source"]
# Source can be an array, in which case we want to skip this index
if (
type(source) is Ref
and source.collection() == Ref("collections")
and source.id() == collection
# Index must have 2 values and no terms
and len(index["values"]) == 2
and len(index["terms"]) == 0
# Index values must be ts and ref
and index["values"][0] == {"field": "ts"}
and index["values"][1] == {"field": "ref"}
):
return index["name"]
if "after" in page:
page = self.client.query(q.paginate(q.indexes(), after=page["after"]))
else:
break
raise ValueError(f"Could not find index for stream '{collection}'")

def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
"""
Returns an AirbyteCatalog representing the available streams and fields in the user's connection to Fauna.
Expand All @@ -247,57 +222,74 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
streams = []

try:
# Check if we entered an index. This will already be validated by check().
can_sync_incremental = config.collection.index != ""

# We only support a single stream. This is limiting, but makes things a lot simpler.
conf = config.collection
stream_name = conf.name
properties = {
"ref": {
"type": "string",
},
"ts": {
"type": "integer",
},
}
if conf.data_column:
properties["data"] = {"type": "object"}
for column in conf.additional_columns:
column_object = {}

# This is how we specify optionals, according to the docs:
# https://docs.airbyte.com/understanding-airbyte/supported-data-types/#nulls
if column.required:
column_object["type"] = column.type
self._setup_client(config)

# Map all the indexes with the correct values to their collection.
collections_to_indexes = {}
page = self.client.query(q.paginate(q.indexes()))
while True:
for id in page["data"]:
try:
index = self.client.query(q.get(id))
except Unauthorized:
# If we don't have permissions to read this index, we ignore it.
continue
source = index["source"]
# Source can be an array, in which case we want to skip this index
if (
type(source) is Ref
and source.collection() == Ref("collections")
# Index must have 2 values and no terms
and len(index["values"]) == 2
and len(index["terms"]) == 0
# Index values must be ts and ref
and index["values"][0] == {"field": "ts"}
and index["values"][1] == {"field": "ref"}
):
collections_to_indexes[source.id()] = index
if "after" in page:
page = self.client.query(q.paginate(q.indexes(), after=page["after"]))
else:
column_object["type"] = ["null", column.type]

# Extra fields, for more formats. See the docs:
# https://docs.airbyte.com/understanding-airbyte/supported-data-types/
if column.format is not None:
column_object["format"] = column.format
if column.airbyte_type is not None:
column_object["airbyte_type"] = column.airbyte_type

properties[column.name] = column_object
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": properties,
}
supported_sync_modes = ["full_refresh"]
if can_sync_incremental:
supported_sync_modes.append("incremental")
streams.append(
AirbyteStream(
name=stream_name,
json_schema=json_schema,
supported_sync_modes=supported_sync_modes,
source_defined_cursor=True,
default_cursor_field=["ts"],
)
)
break

page = self.client.query(q.paginate(q.collections()))
while True:
for collection in page["data"]:
stream_name = collection.id()
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"ref": {
"type": "string",
},
"ts": {
"type": "integer",
},
"data": {
"type": "object",
},
"ttl": {
"type": ["null", "integer"],
},
},
}
supported_sync_modes = ["full_refresh"]
if stream_name in collections_to_indexes:
supported_sync_modes.append("incremental")
streams.append(
AirbyteStream(
name=stream_name,
json_schema=json_schema,
supported_sync_modes=supported_sync_modes,
source_defined_cursor=True,
default_cursor_field=["ts"],
)
)
if "after" in page:
page = self.client.query(q.paginate(q.collections(), after=page["after"]))
else:
break
except Exception as e:
logger.error(f"error in discover: {e}")
return AirbyteCatalog(streams=[])
Expand Down Expand Up @@ -655,6 +647,8 @@ def make_message(stream_name, data_obj) -> AirbyteMessage:
if stream_name not in state:
state[stream_name] = {}

index = self.find_index_for_stream(stream_name)
logger.info(f"found index '{index}', which will be used to sync '{stream_name}'")
read_deletions = config.collection.deletions.mode == "deleted_field"

# Read removals
Expand All @@ -679,7 +673,7 @@ def make_message(stream_name, data_obj) -> AirbyteMessage:
stream,
config.collection,
state[stream_name]["updates_cursor"],
config.collection.index,
index,
config.collection.page_size,
):
yield make_message(stream_name, data_obj)
Expand Down
Loading

0 comments on commit da7a7a3

Please sign in to comment.