From da7a7a3f9dcbae1906bcf61a8dede4cf278a7e84 Mon Sep 17 00:00:00 2001 From: Neil Macneale V Date: Thu, 8 Sep 2022 16:13:49 -0700 Subject: [PATCH] Remove custom fields (#4) * Remove custom fields from source.py * Remove custom fields from spec.yaml * Collections that support incremental sync are found correctly * Run formatter * Index values and termins are verified * Stripped additional_columns from collection config and check() * We now search for an index at the start of each sync * Add default for missing data in collection * Add a log message about the index chosen to sync an incremental stream * Add an example for a configured incremental catalog * Check test now validates the simplified check function * Remove collection name from spec.yaml and CollectionConfig * Update test_util.py to ahere to the new config * Update the first discover test to validate that we can find indexes correctly * Remove other discover tests, as they no longer apply * Full refresh test now works with simplified expanded columns * Remove unused imports * Incremental test now adheres to the find_index_for_stream system * Database test passes, so now all unit tests pass again * Remove extra fields from required section * ttl is nullable * Data defaults to an empty object * Update tests to reflect ttl and data select changes * Fix expected records. All unit tests and acceptance tests pass * Cleanup docs for find_index_for_stream * Update setup guide to reflect multiple collections --- .../configured_catalog_incremental.json | 19 ++ .../expected_deletions_records.txt | 4 +- .../integration_tests/expected_records.txt | 8 +- .../source-fauna/source_fauna/source.py | 236 +++++++------- .../source-fauna/source_fauna/spec.yaml | 108 ------- .../source-fauna/unit_tests/check_test.py | 255 +-------------- .../source-fauna/unit_tests/database_test.py | 48 ++- .../source-fauna/unit_tests/discover_test.py | 298 +++--------------- .../unit_tests/full_refresh_test.py | 63 ++-- .../unit_tests/incremental_test.py | 35 +- .../source-fauna/unit_tests/test_util.py | 27 +- docs/integrations/sources/fauna.md | 95 +++++- 12 files changed, 360 insertions(+), 836 deletions(-) create mode 100644 airbyte-integrations/connectors/source-fauna/examples/configured_catalog_incremental.json diff --git a/airbyte-integrations/connectors/source-fauna/examples/configured_catalog_incremental.json b/airbyte-integrations/connectors/source-fauna/examples/configured_catalog_incremental.json new file mode 100644 index 000000000000..a1155bf9aa00 --- /dev/null +++ b/airbyte-integrations/connectors/source-fauna/examples/configured_catalog_incremental.json @@ -0,0 +1,19 @@ +{ + "streams": [ + { + "stream": { + "name": "foo", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["column_name"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-fauna/integration_tests/expected_deletions_records.txt b/airbyte-integrations/connectors/source-fauna/integration_tests/expected_deletions_records.txt index 6b1acac9bcf9..3603505d5a5e 100644 --- a/airbyte-integrations/connectors/source-fauna/integration_tests/expected_deletions_records.txt +++ b/airbyte-integrations/connectors/source-fauna/integration_tests/expected_deletions_records.txt @@ -1,5 +1,5 @@ { "stream": "deletions-data", "emitted_at": "1", "data": { "ref": "338836293305763911", "ts": 1659398359360000, "deleted_at": "2022-08-01T23:59:19.360000" } } { "stream": "deletions-data", "emitted_at": "2", "data": { "ref": "338836293305761863", "ts": 1659398366330000, "deleted_at": "2022-08-01T23:59:26.330000" } } { "stream": "deletions-data", "emitted_at": "3", "data": { "ref": "338836293305765959", "ts": 1659398371330000, "deleted_at": "2022-08-01T23:59:31.330000" } } -{ "stream": "deletions-data", "emitted_at": "5", "data": { "ref": "338836293305762887", "ts": 1659398320430000, "data": { "a": 6, "nested": { "value": 20 } } } } -{ "stream": "deletions-data", "emitted_at": "7", "data": { "ref": "338836293305764935", "ts": 1659398320430000, "data": { "a": 8, "nested": { "value": 30 } } } } +{ "stream": "deletions-data", "emitted_at": "5", "data": { "ref": "338836293305762887", "ts": 1659398320430000, "data": { "a": 6, "nested": { "value": 20 } }, "ttl": null } } +{ "stream": "deletions-data", "emitted_at": "7", "data": { "ref": "338836293305764935", "ts": 1659398320430000, "data": { "a": 8, "nested": { "value": 30 } }, "ttl": null } } diff --git a/airbyte-integrations/connectors/source-fauna/integration_tests/expected_records.txt b/airbyte-integrations/connectors/source-fauna/integration_tests/expected_records.txt index 4ed9a09e4e46..87368af19f3a 100644 --- a/airbyte-integrations/connectors/source-fauna/integration_tests/expected_records.txt +++ b/airbyte-integrations/connectors/source-fauna/integration_tests/expected_records.txt @@ -1,4 +1,4 @@ -{ "stream": "sample-data", "emitted_at": "1", "data": { "ref": "337567897171787849", "ts": 1658188683585000, "data": { "a": 5, "nested": { "value": 15 } }, "my_a_col": 5, "nested_column": 15, "optional_name": null } } -{ "stream": "sample-data", "emitted_at": "2", "data": { "ref": "337567897172836425", "ts": 1658271973660000, "data": { "a": 6, "nested": { "value": 20 }, "name": "hello world" }, "my_a_col": 6, "nested_column": 20, "optional_name": "hello world" } } -{ "stream": "sample-data", "emitted_at": "3", "data": { "ref": "337567897172837449", "ts": 1658188683585000, "data": { "a": 7, "nested": { "value": 25 } }, "my_a_col": 7, "nested_column": 25, "optional_name": null } } -{ "stream": "sample-data", "emitted_at": "4", "data": { "ref": "337567897173885001", "ts": 1658188683585000, "data": { "a": 8, "nested": { "value": 30 } }, "my_a_col": 8, "nested_column": 30, "optional_name": null } } +{ "stream": "sample-data", "emitted_at": "1", "data": { "ref": "337567897171787849", "ts": 1658188683585000, "data": { "a": 5, "nested": { "value": 15 } }, "ttl": null } } +{ "stream": "sample-data", "emitted_at": "2", "data": { "ref": "337567897172836425", "ts": 1658271973660000, "data": { "a": 6, "nested": { "value": 20 }, "name": "hello world" }, "ttl": null } } +{ "stream": "sample-data", "emitted_at": "3", "data": { "ref": "337567897172837449", "ts": 1658188683585000, "data": { "a": 7, "nested": { "value": 25 } }, "ttl": null } } +{ "stream": "sample-data", "emitted_at": "4", "data": { "ref": "337567897173885001", "ts": 1658188683585000, "data": { "a": 8, "nested": { "value": 30 } }, "ttl": null } } diff --git a/airbyte-integrations/connectors/source-fauna/source_fauna/source.py b/airbyte-integrations/connectors/source-fauna/source_fauna/source.py index 28c288df457a..3a2d03ff3e2c 100644 --- a/airbyte-integrations/connectors/source-fauna/source_fauna/source.py +++ b/airbyte-integrations/connectors/source-fauna/source_fauna/source.py @@ -47,38 +47,13 @@ def __init__(self, conf): class CollectionConfig: def __init__(self, conf): - # Name of the collection we are reading from. - self.name = conf["name"] - # true or false; do we add a `data` column that mirrors all the data in each document? - self.data_column = conf["data_column"] - # Any additional columns the user wants. - self.additional_columns = [Column(x) for x in conf.get("additional_columns", [])] # The page size, used in all Paginate() calls. self.page_size = conf["page_size"] - # Index name used in read_updates. Default to empty string - self.index = conf.get("index", "") # Configs for how deletions are handled self.deletions = DeletionsConfig(conf["deletions"]) -class Column: - def __init__(self, conf): - # The name of this column. This is the name that will appear in the destination. - self.name = conf["name"] - # The path of the value within fauna. This is an array of strings. - self.path = conf["path"] - # The type of the value used in Airbyte. This will be used by most destinations - # as the column type. This is not validated at all! - self.type = conf["type"] - # If true, then the path above must exist in every document. - self.required = conf["required"] - # The format and airbyte_type are extra typing fields. Documentation: - # https://docs.airbyte.com/understanding-airbyte/supported-data-types/ - self.format = conf.get("format") - self.airbyte_type = conf.get("airbyte_type") - - class DeletionsConfig: def __init__(self, conf): self.mode = conf["deletion_mode"] @@ -101,25 +76,9 @@ def expand_column_query(conf: CollectionConfig, value): obj = { "ref": q.select(["ref", "id"], doc), "ts": q.select("ts", doc), + "data": q.select("data", doc, {}), + "ttl": q.select("ttl", doc, None), } - if conf.data_column: - obj["data"] = q.select("data", doc) - for column in conf.additional_columns: - if column.required: - obj[column.name] = q.select( - column.path, - doc, - q.abort( - q.format( - f"The path {column.path} does not exist in document Ref(%s, collection=%s)", - q.select(["ref", "id"], doc), - q.select(["ref", "collection", "id"], doc), - ) - ), - ) - else: - # If not required, default to None - obj[column.name] = q.select(column.path, doc, None) return q.let( {"document": q.get(value)}, obj, @@ -158,23 +117,7 @@ def fail(message: str) -> AirbyteConnectionStatus: try: self._setup_client(config) - # STEP 1: Validate as much as we can before connecting to the database - - # Collection config, which has all collection-specific config fields. - conf = config.collection - - # Make sure they didn't choose an duplicate or invalid column names. - column_names = {} - for column in conf.additional_columns: - # We never allow a custom `data` column, as they might want to enable the - # data column later. - if column.name == "data" or column.name == "ref" or column.name == "ts": - return fail(f"Additional column cannot have reserved name '{column.name}'") - if column.name in column_names: - return fail(f"Additional column cannot have duplicate name '{column.name}'") - column_names[column.name] = () - - # STEP 2: Validate everything else after making sure the database is up. + # Validate everything else after making sure the database is up. try: self.client.query(q.now()) except Exception as e: @@ -183,18 +126,15 @@ def fail(message: str) -> AirbyteConnectionStatus: else: return fail(f"Failed to connect to database: {e}") - # Validate the collection exists - collection = conf.name + # Validate our permissions try: - self.client.query(q.paginate(q.documents(q.collection(collection)), size=1)) + self.client.query(q.paginate(q.collections())) except FaunaError: - return fail(f"Collection '{collection}' does not exist") - - # If they entered an index, make sure it's correct - if conf.index != "": - res = self._validate_index(conf.name, conf.index) - if res is not None: - return fail(res) + return fail("No permissions to list collections") + try: + self.client.query(q.paginate(q.indexes())) + except FaunaError: + return fail("No permissions to list indexes") return AirbyteConnectionStatus(status=Status.SUCCEEDED) except Exception as e: @@ -227,6 +167,41 @@ def _validate_index(self, collection: str, index: str) -> Optional[str]: # All above checks passed, so it's valid. return None + def find_index_for_stream(self, collection: str) -> str: + """ + Finds the index for the given collection name. This will iterate over all indexes, and find + one that has the correct source, values, and terms. + + :param collection: The name of the collection to search for. + """ + page = self.client.query(q.paginate(q.indexes())) + while True: + for id in page["data"]: + try: + index = self.client.query(q.get(id)) + except Unauthorized: + # If we don't have permissions to read this index, we ignore it. + continue + source = index["source"] + # Source can be an array, in which case we want to skip this index + if ( + type(source) is Ref + and source.collection() == Ref("collections") + and source.id() == collection + # Index must have 2 values and no terms + and len(index["values"]) == 2 + and len(index["terms"]) == 0 + # Index values must be ts and ref + and index["values"][0] == {"field": "ts"} + and index["values"][1] == {"field": "ref"} + ): + return index["name"] + if "after" in page: + page = self.client.query(q.paginate(q.indexes(), after=page["after"])) + else: + break + raise ValueError(f"Could not find index for stream '{collection}'") + def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: """ Returns an AirbyteCatalog representing the available streams and fields in the user's connection to Fauna. @@ -247,57 +222,74 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: streams = [] try: - # Check if we entered an index. This will already be validated by check(). - can_sync_incremental = config.collection.index != "" - - # We only support a single stream. This is limiting, but makes things a lot simpler. - conf = config.collection - stream_name = conf.name - properties = { - "ref": { - "type": "string", - }, - "ts": { - "type": "integer", - }, - } - if conf.data_column: - properties["data"] = {"type": "object"} - for column in conf.additional_columns: - column_object = {} - - # This is how we specify optionals, according to the docs: - # https://docs.airbyte.com/understanding-airbyte/supported-data-types/#nulls - if column.required: - column_object["type"] = column.type + self._setup_client(config) + + # Map all the indexes with the correct values to their collection. + collections_to_indexes = {} + page = self.client.query(q.paginate(q.indexes())) + while True: + for id in page["data"]: + try: + index = self.client.query(q.get(id)) + except Unauthorized: + # If we don't have permissions to read this index, we ignore it. + continue + source = index["source"] + # Source can be an array, in which case we want to skip this index + if ( + type(source) is Ref + and source.collection() == Ref("collections") + # Index must have 2 values and no terms + and len(index["values"]) == 2 + and len(index["terms"]) == 0 + # Index values must be ts and ref + and index["values"][0] == {"field": "ts"} + and index["values"][1] == {"field": "ref"} + ): + collections_to_indexes[source.id()] = index + if "after" in page: + page = self.client.query(q.paginate(q.indexes(), after=page["after"])) else: - column_object["type"] = ["null", column.type] - - # Extra fields, for more formats. See the docs: - # https://docs.airbyte.com/understanding-airbyte/supported-data-types/ - if column.format is not None: - column_object["format"] = column.format - if column.airbyte_type is not None: - column_object["airbyte_type"] = column.airbyte_type - - properties[column.name] = column_object - json_schema = { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": properties, - } - supported_sync_modes = ["full_refresh"] - if can_sync_incremental: - supported_sync_modes.append("incremental") - streams.append( - AirbyteStream( - name=stream_name, - json_schema=json_schema, - supported_sync_modes=supported_sync_modes, - source_defined_cursor=True, - default_cursor_field=["ts"], - ) - ) + break + + page = self.client.query(q.paginate(q.collections())) + while True: + for collection in page["data"]: + stream_name = collection.id() + json_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "ref": { + "type": "string", + }, + "ts": { + "type": "integer", + }, + "data": { + "type": "object", + }, + "ttl": { + "type": ["null", "integer"], + }, + }, + } + supported_sync_modes = ["full_refresh"] + if stream_name in collections_to_indexes: + supported_sync_modes.append("incremental") + streams.append( + AirbyteStream( + name=stream_name, + json_schema=json_schema, + supported_sync_modes=supported_sync_modes, + source_defined_cursor=True, + default_cursor_field=["ts"], + ) + ) + if "after" in page: + page = self.client.query(q.paginate(q.collections(), after=page["after"])) + else: + break except Exception as e: logger.error(f"error in discover: {e}") return AirbyteCatalog(streams=[]) @@ -655,6 +647,8 @@ def make_message(stream_name, data_obj) -> AirbyteMessage: if stream_name not in state: state[stream_name] = {} + index = self.find_index_for_stream(stream_name) + logger.info(f"found index '{index}', which will be used to sync '{stream_name}'") read_deletions = config.collection.deletions.mode == "deleted_field" # Read removals @@ -679,7 +673,7 @@ def make_message(stream_name, data_obj) -> AirbyteMessage: stream, config.collection, state[stream_name]["updates_cursor"], - config.collection.index, + index, config.collection.page_size, ): yield make_message(stream_name, data_obj) diff --git a/airbyte-integrations/connectors/source-fauna/source_fauna/spec.yaml b/airbyte-integrations/connectors/source-fauna/source_fauna/spec.yaml index 6a6c33b9d6f7..0a16fdecb967 100644 --- a/airbyte-integrations/connectors/source-fauna/source_fauna/spec.yaml +++ b/airbyte-integrations/connectors/source-fauna/source_fauna/spec.yaml @@ -42,117 +42,9 @@ connectionSpecification: title: Collection description: Settings for the Fauna Collection. required: - - name - - data_column - page_size - deletions properties: - name: - order: 0 - type: string - title: Collection Name - description: The name of the collection this connector should read from. - index: - title: Incremental Sync Index - type: string - order: 1 - description: >- - This is required for incremental syncs. Leave blank for full syncs. - data_column: - order: 2 - type: boolean - title: Data Column - default: true - description: >- - If enabled, then a column named "data" is added to the destination database. - This column contains the entire data object from each Fauna - document.
- additional_columns: - order: 3 - type: array - title: Additional Columns - description: >- - These fields are added as columns in the destination database.
- - This can be used in addition to, or in place of, the data column if you - would like to map certain document fields into columns, e.g. - ["data", "customer", "address"] or ["ttl"].
- items: - type: object - required: - - name - - path - - type - - required - properties: - name: - order: 0 - type: string - title: Column Name - description: The name of this column in the destination. - path: - order: 1 - type: array - title: Path - description: >- - The path of this value within each Fauna document. Path evaluation starts at the top - of each document. To specify one of the document's data fields, start the - path with data. For example: ["data", "address", "zip_code"]. - To specify a top-level field such as ttl, enter the path ["ttl"].
- - Numerical values cannot be entered. It is not possible to index into an - array using this path.
- - This path is the same as a Select call. - items: - type: string - type: - order: 2 - type: string - title: Type - description: >- - The Airbyte record type for this field value from the Fauna document. Type - specifies the encoding of the value separate from the type of value specified - by Format. - - For example, `2022-08-05T12:34:56+00:00` is a value whose `Type` is - "string" and its `Format` is "date-time". - - See the Airbyte docs. - format: - order: 3 - type: string - title: Format - description: >- - The format of this field value from the Fauna document. Format - specifies the type of value separate from the encoding of the value specified - by Type. - - For example, 2022-08-05T12:34:56+00:00 is a value whose Type - is "string" and its Format is "date-time". - - See the Airbyte docs. - airbyte_type: - order: 4 - type: string - title: Airbyte Type - description: >- - The Airbyte record type used to store the field value in the destination database. - - For example, 2022-08-05T12:34:56+00:00 is a value whose Type - is "string" and its Format is "date-time". The most appropriate Airbyte record - type would be "Date". - - See the Airbyte docs. - required: - order: 5 - type: boolean - title: Required - default: true - description: >- - If true, this value is required to exist within each Fauna document. If the path - doesn't exist in a document the sync fails. - If false, then non-existent field values store null. page_size: order: 4 type: integer diff --git a/airbyte-integrations/connectors/source-fauna/unit_tests/check_test.py b/airbyte-integrations/connectors/source-fauna/unit_tests/check_test.py index 667165db76ba..b6fbf064acbd 100644 --- a/airbyte-integrations/connectors/source-fauna/unit_tests/check_test.py +++ b/airbyte-integrations/connectors/source-fauna/unit_tests/check_test.py @@ -6,7 +6,7 @@ from airbyte_cdk.models import Status from faunadb import query as q -from faunadb.errors import FaunaError, Unauthorized +from faunadb.errors import Unauthorized from faunadb.objects import Ref from source_fauna import SourceFauna from test_util import config, mock_logger @@ -16,47 +16,18 @@ def query_hardcoded(expr): print(expr) if expr == q.now(): return 0 - elif expr == q.paginate(q.documents(q.collection("foo")), size=1): - return ["my_data_here"] - elif expr == q.paginate(q.documents(q.collection("invalid_collection_name")), size=1): - raise FaunaError("", "") - # Results for index 'ts' - elif expr == q.exists(q.index("ts")): - return True - elif expr == q.select("source", q.get(q.index("ts"))): - return Ref("foo", Ref("collections")) - elif expr == q.select("values", q.get(q.index("ts")), []): + elif expr == q.paginate(q.collections()): + return [{"ref": Ref("foo", Ref("collections"))}] + elif expr == q.paginate(q.indexes()): return [ - {"field": "ts"}, - {"field": "ref"}, - ] - # Results for index 'invalid_index_name' - elif expr == q.exists(q.index("invalid_index_name")): - return False - # Results for index 'invalid_source_index' - elif expr == q.exists(q.index("invalid_source_index")): - return True - elif expr == q.select("source", q.get(q.index("invalid_source_index"))): - return Ref("wrong_collection", Ref("collections")) - # Results for index 'no_values_index' - elif expr == q.exists(q.index("no_values_index")): - return True - elif expr == q.select("source", q.get(q.index("no_values_index"))): - return Ref("foo", Ref("collections")) - elif expr == q.select("values", q.get(q.index("no_values_index")), []): - return [] - # Results for index 'extra_values_index' - elif expr == q.exists(q.index("extra_values_index")): - return True - elif expr == q.select("source", q.get(q.index("extra_values_index"))): - return Ref("foo", Ref("collections")) - elif expr == q.select("values", q.get(q.index("extra_values_index")), []): - return [ - {"field": "ts"}, - {"field": "ref"}, - {"field": "lots"}, - {"field": "of"}, - {"field": "extras"}, + { + "source": Ref("foo", Ref("collections")), + "values": [ + {"field": "ts"}, + {"field": "ref"}, + ], + "terms": [], + } ] else: raise ValueError(f"invalid query {expr}") @@ -105,205 +76,3 @@ def test_invalid_check(): assert source._setup_client.called assert not logger.error.called - - -def mock_source() -> SourceFauna: - source = SourceFauna() - source._setup_client = Mock() - source.client = MagicMock() - source.client.query = query_hardcoded - return source - - -def test_check_fails(): - def expect_fail(conf): - source = mock_source() - logger = mock_logger() - result = source.check( - logger, - config=config( - { - "collection": conf, - } - ), - ) - print(result.message) - assert result.status == Status.FAILED - - def expect_succeed(conf): - source = mock_source() - logger = mock_logger() - result = source.check( - logger, - config=config( - { - "collection": conf, - } - ), - ) - print(result.message) - assert result.status == Status.SUCCEEDED - - # Each of these tests relies on the behavior of query_hardcoded, defined at the top of this file. - - # Valid collection "foo" - expect_succeed( - { - "name": "foo", - "index": "", - } - ) - # No collection "invalid_collection_name" - expect_fail( - { - "name": "invalid_collection_name", - "index": "", - } - ) - # Valid index "ts" - expect_succeed( - { - "name": "foo", - "index": "ts", - } - ) - # No index "invalid_index_name" - expect_fail( - { - "name": "foo", - "index": "invalid_index_name", - } - ) - # Wrong source on index "invalid_source" - expect_fail( - { - "name": "foo", - "index": "wrong_source", - } - ) - # Extra values on index "extra_values_index", which is fine - expect_succeed( - { - "name": "foo", - "index": "extra_values_index", - } - ) - # Not enough values on index "no_values_index" - expect_fail( - { - "name": "foo", - "index": "no_values_index", - } - ) - - -def test_config_columns(): - def expect_fail(columns): - source = mock_source() - logger = mock_logger() - result = source.check( - logger, - config=config( - { - "collection": { - "additional_columns": columns, - }, - } - ), - ) - assert result.status == Status.FAILED - - def expect_succeed(columns): - source = mock_source() - logger = mock_logger() - result = source.check( - logger, - config=config( - { - "collection": { - "additional_columns": columns, - }, - } - ), - ) - assert result.status == Status.SUCCEEDED - - # Invalid column name "data" - expect_fail( - [ - { - "name": "data", - "path": ["data"], - "type": "object", - "required": True, - } - ] - ) - # Invalid column name "ref" - expect_fail( - [ - { - "name": "ref", - "path": ["data"], - "type": "object", - "required": True, - } - ] - ) - # Invalid column name "ts" - expect_fail( - [ - { - "name": "ts", - "path": ["data"], - "type": "object", - "required": True, - } - ] - ) - # Valid column name "my_column" - expect_succeed( - [ - { - "name": "my_column", - "path": ["data"], - "type": "object", - "required": True, - } - ] - ) - - # No duplicate columns - expect_fail( - [ - { - "name": "duplicate_name", - "path": ["data"], - "type": "object", - "required": True, - }, - { - "name": "duplicate_name", - "path": ["data"], - "type": "object", - "required": True, - }, - ] - ) - # Valid config - expect_succeed( - [ - { - "name": "column_1", - "path": ["data"], - "type": "object", - "required": True, - }, - { - "name": "column_2", - "path": ["data"], - "type": "object", - "required": True, - }, - ] - ) diff --git a/airbyte-integrations/connectors/source-fauna/unit_tests/database_test.py b/airbyte-integrations/connectors/source-fauna/unit_tests/database_test.py index 84f1c36283b9..0db45365f874 100644 --- a/airbyte-integrations/connectors/source-fauna/unit_tests/database_test.py +++ b/airbyte-integrations/connectors/source-fauna/unit_tests/database_test.py @@ -176,7 +176,6 @@ def run_add_removes_test(source: SourceFauna, logger, stream: ConfiguredAirbyteS ) conf = CollectionConfig( - name="foo", deletions=DeletionsConfig.ignore(), ) results = list(source.read_removes(logger, stream, conf, state={}, deletion_column="my_deletion_col")) @@ -203,7 +202,6 @@ def run_removes_order_test(source: SourceFauna, logger, stream: ConfiguredAirbyt print(ref1, ref2, ref3) conf = CollectionConfig( - name="foo", deletions=DeletionsConfig.ignore(), page_size=2, ) @@ -296,10 +294,6 @@ def run_general_remove_test(source: SourceFauna, logger): { "port": 9000, "collection": { - "data_column": True, - "additional_columns": [], - "name": "deletions_test", - "index": "deletions_test_ts", "deletions": {"deletion_mode": "deleted_field", "column": "deleted_at"}, }, } @@ -311,21 +305,25 @@ def run_general_remove_test(source: SourceFauna, logger): "ref": "101", "ts": db_data[0]["ts"], "data": {"a": 5}, + "ttl": None, }, { "ref": "102", "ts": db_data[1]["ts"], "data": {"a": 6}, + "ttl": None, }, { "ref": "103", "ts": db_data[2]["ts"], "data": {"a": 7}, + "ttl": None, }, { "ref": "104", "ts": db_data[3]["ts"], "data": {"a": 8}, + "ttl": None, }, ] @@ -398,19 +396,7 @@ def run_updates_test(db_data, source: SourceFauna, logger, catalog: ConfiguredAi conf = config( { "port": 9000, - "collection": { - "data_column": False, - "additional_columns": [ - { - "name": "a", - "path": ["data", "a"], - "type": "integer", - "required": True, - } - ], - "name": "foo", - "index": "foo_ts", - }, + "collection": {}, } ) handle_check(source.check(logger, conf)) @@ -421,22 +407,26 @@ def run_updates_test(db_data, source: SourceFauna, logger, catalog: ConfiguredAi { "ref": db_data["ref"][0].id(), "ts": db_data["ts"][0], - "a": 5, + "data": {"a": 5}, + "ttl": None, }, { "ref": db_data["ref"][1].id(), "ts": db_data["ts"][1], - "a": 6, + "data": {"a": 6}, + "ttl": None, }, { "ref": db_data["ref"][2].id(), "ts": db_data["ts"][2], - "a": 7, + "data": {"a": 7}, + "ttl": None, }, { "ref": db_data["ref"][3].id(), "ts": db_data["ts"][3], - "a": 8, + "data": {"a": 8}, + "ttl": None, }, ] print("=== check: make sure the state resumes") @@ -467,16 +457,20 @@ def run_updates_test(db_data, source: SourceFauna, logger, catalog: ConfiguredAi "ref": db_data["ref"][1].id(), # New ts "ts": update_result["ts"], - # New value - "a": 10, + # New data + "data": {"a": 10}, + # Same ttl + "ttl": None, }, { # New ref "ref": "200", # New ts "ts": create_result["ts"], - # New value - "a": 10000, + # New data + "data": {"a": 10000}, + # Same ttl + "ttl": None, }, ] diff --git a/airbyte-integrations/connectors/source-fauna/unit_tests/discover_test.py b/airbyte-integrations/connectors/source-fauna/unit_tests/discover_test.py index 1fb270e8b248..ff08e85c7a55 100644 --- a/airbyte-integrations/connectors/source-fauna/unit_tests/discover_test.py +++ b/airbyte-integrations/connectors/source-fauna/unit_tests/discover_test.py @@ -4,7 +4,9 @@ from unittest.mock import MagicMock, Mock -from airbyte_cdk.models import AirbyteCatalog, AirbyteStream, SyncMode +from airbyte_cdk.models import AirbyteStream +from faunadb import query as q +from faunadb.objects import Ref from source_fauna import SourceFauna from test_util import config, mock_logger @@ -24,29 +26,46 @@ def schema(properties) -> dict: } +def query_hardcoded(expr): + print(expr) + if expr == q.now(): + return 0 + elif expr == q.paginate(q.collections()): + return {"data": [Ref("foo", Ref("collections")), Ref("bar", Ref("collections"))]} + elif expr == q.paginate(q.indexes()): + return { + "data": [ + Ref("ts", Ref("indexes")), + ] + } + elif expr == q.get(Ref("ts", Ref("indexes"))): + return { + "source": Ref("foo", Ref("collections")), + "name": "ts", + "values": [ + {"field": "ts"}, + {"field": "ref"}, + ], + "terms": [], + } + else: + raise ValueError(f"invalid query {expr}") + + def test_simple_discover(): source = SourceFauna() source._setup_client = Mock() source.client = MagicMock() - source.client.query = Mock() + source.client.query = query_hardcoded logger = mock_logger() result = source.discover( logger, - config=config( - { - "collection": { - "name": "1234", - "index": "", - "data_column": True, - "additional_columns": [], - } - } - ), + config=config({}), ) assert result.streams == [ AirbyteStream( - name="1234", + name="foo", json_schema={ "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", @@ -60,45 +79,19 @@ def test_simple_discover(): "ts": { "type": "integer", }, + "ttl": { + "type": ["null", "integer"], + }, }, }, - supported_sync_modes=["full_refresh"], + supported_sync_modes=["full_refresh", "incremental"], source_defined_cursor=True, default_cursor_field=["ts"], source_defined_primary_key=None, namespace=None, - ) - ] - assert not logger.info.called - assert not logger.error.called - - assert not source._setup_client.called - assert not source.client.query.called - - -def test_discover_valid_index(): - source = SourceFauna() - source._setup_client = Mock() - source.client = MagicMock() - source.client.query = Mock() - - logger = mock_logger() - result = source.discover( - logger, - config=config( - { - "collection": { - "name": "1234", - "index": "my_index", - "data_column": True, - "additional_columns": [], - } - } ), - ) - assert result.streams == [ AirbyteStream( - name="1234", + name="bar", json_schema={ "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", @@ -112,215 +105,8 @@ def test_discover_valid_index(): "ts": { "type": "integer", }, - }, - }, - supported_sync_modes=["full_refresh", "incremental"], - source_defined_cursor=True, - default_cursor_field=["ts"], - source_defined_primary_key=None, - namespace=None, - ) - ] - assert not logger.info.called - assert not logger.error.called - - assert not source._setup_client.called - assert not source.client.query.called - - -# Validates that the stream from discover() has the correct schema. -def test_config_columns(): - def expect_schema(collection_config, schema): - collection_config["name"] = "my_stream_name" - collection_config["index"] = "" - source = mock_source() - logger = mock_logger() - result = source.discover( - logger, - config=config( - { - "collection": collection_config, - } - ), - ) - assert result == AirbyteCatalog( - streams=[ - AirbyteStream( - name="my_stream_name", - json_schema=schema, - supported_sync_modes=[SyncMode.full_refresh], - default_cursor_field=["ts"], - source_defined_cursor=True, - ), - ] - ) - assert not source.client.query.called - - expect_schema( - { - "data_column": True, - "additional_columns": [], - }, - schema( - { - "ref": { - "type": "string", - }, - "ts": { - "type": "integer", - }, - "data": { - "type": "object", - }, - } - ), - ) - expect_schema( - { - "data_column": False, - "additional_columns": [], - }, - schema( - { - "ref": { - "type": "string", - }, - "ts": { - "type": "integer", - }, - } - ), - ) - expect_schema( - { - "data_column": False, - "additional_columns": [ - { - "name": "my_column", - "path": ["a", "b", "c"], - "type": "boolean", - "required": True, - } - ], - }, - schema( - { - "ref": { - "type": "string", - }, - "ts": { - "type": "integer", - }, - "my_column": { - "type": "boolean", - }, - } - ), - ) - # Optional columns should be nullable (the type being [null, boolean] means it is nullable). - expect_schema( - { - "data_column": False, - "additional_columns": [ - { - "name": "my_column", - "path": ["a", "b", "c"], - "type": "boolean", - "required": False, - } - ], - }, - schema( - { - "ref": { - "type": "string", - }, - "ts": { - "type": "integer", - }, - "my_column": { - "type": ["null", "boolean"], - }, - } - ), - ) - expect_schema( - { - "data_column": True, - "additional_columns": [ - { - "name": "extra_date_info", - "path": ["data", "date_created"], - "type": "string", - "format": "date-time", - "airbyte_type": "timestamp_with_timezone", - "required": True, - } - ], - }, - schema( - { - "ref": { - "type": "string", - }, - "ts": { - "type": "integer", - }, - "data": { - "type": "object", - }, - "extra_date_info": {"type": "string", "format": "date-time", "airbyte_type": "timestamp_with_timezone"}, - } - ), - ) - - -def test_discover_extra_columns(): - source = SourceFauna() - source._setup_client = Mock() - source.client = MagicMock() - source.client.query = Mock() - - logger = mock_logger() - result = source.discover( - logger, - config=config( - { - "collection": { - "name": "1234", - "index": "", - "data_column": False, - "additional_columns": [ - { - "name": "my_column", - "path": ["data", "my_field"], - "type": "string", - "required": True, - "format": "date-time", - "airbyte_type": "date_with_timezone", - } - ], - } - } - ), - ) - assert result.streams == [ - AirbyteStream( - name="1234", - json_schema={ - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "ref": { - "type": "string", - }, - "ts": { - "type": "integer", - }, - "my_column": { - "type": "string", - "format": "date-time", - "airbyte_type": "date_with_timezone", + "ttl": { + "type": ["null", "integer"], }, }, }, @@ -329,9 +115,9 @@ def test_discover_extra_columns(): default_cursor_field=["ts"], source_defined_primary_key=None, namespace=None, - ) + ), ] + assert not logger.info.called assert not logger.error.called - assert not source._setup_client.called - assert not source.client.query.called + assert source._setup_client.called diff --git a/airbyte-integrations/connectors/source-fauna/unit_tests/full_refresh_test.py b/airbyte-integrations/connectors/source-fauna/unit_tests/full_refresh_test.py index c5f12585dfe5..da534ae70ce2 100644 --- a/airbyte-integrations/connectors/source-fauna/unit_tests/full_refresh_test.py +++ b/airbyte-integrations/connectors/source-fauna/unit_tests/full_refresh_test.py @@ -7,7 +7,7 @@ from faunadb import _json from faunadb import query as q from source_fauna import SourceFauna -from test_util import CollectionConfig, Column, expand_columns_query, mock_logger +from test_util import CollectionConfig, expand_columns_query, mock_logger def results(modified, after): @@ -121,18 +121,8 @@ def expand_columns_query_with_extra(ref): { "ref": q.select(["ref", "id"], doc), "ts": q.select("ts", doc), - "my_column": q.select( - ["data", "my_field"], - doc, - q.abort( - q.format( - "The path ['data', 'my_field'] does not exist in document Ref(%s, collection=%s)", - q.select(["ref", "id"], doc), - q.select(["ref", "collection", "id"], doc), - ) - ), - ), - "optional_data": q.select(["data", "nested", "nested_field"], doc, None), + "data": q.select("data", doc, {}), + "ttl": q.select("ttl", doc, None), }, ) @@ -158,15 +148,23 @@ def expand_columns_query_with_extra(ref): { "ref": "3", "ts": 12345, - "my_column": "fancy string here", - "optional_data": 3, + "data": { + "my_column": "fancy string here", + "optional_data": 3, + }, + }, + { + "ref": "5", + "ts": 123459, + "data": {"my_column": "another fancy string here", "optional_data": 5}, }, - {"ref": "5", "ts": 123459, "my_column": "another fancy string here", "optional_data": 5}, { "ref": "7", "ts": 1234599, - "my_column": "even more fancy string here", - "optional_data": None, + "data": { + "my_column": "even more fancy string here", + "optional_data": None, + }, }, ], after=None, @@ -194,16 +192,7 @@ def query_hardcoded(expr): source.read_all( logger, stream, - conf=CollectionConfig( - page_size=PAGE_SIZE, - data_column=False, - additional_columns=[ - Column(name="my_column", path=["data", "my_field"], type="this doesn't matter in read()", required=True), - Column( - name="optional_data", path=["data", "nested", "nested_field"], type="this doesn't matter in read()", required=False - ), - ], - ), + conf=CollectionConfig(page_size=PAGE_SIZE), state={"full_sync_cursor": {"ts": TS}}, ) ) @@ -211,20 +200,26 @@ def query_hardcoded(expr): { "ref": "3", "ts": 12345, - "my_column": "fancy string here", - "optional_data": 3, + "data": { + "my_column": "fancy string here", + "optional_data": 3, + }, }, { "ref": "5", "ts": 123459, - "my_column": "another fancy string here", - "optional_data": 5, + "data": { + "my_column": "another fancy string here", + "optional_data": 5, + }, }, { "ref": "7", "ts": 1234599, - "my_column": "even more fancy string here", - "optional_data": None, + "data": { + "my_column": "even more fancy string here", + "optional_data": None, + }, }, ] diff --git a/airbyte-integrations/connectors/source-fauna/unit_tests/incremental_test.py b/airbyte-integrations/connectors/source-fauna/unit_tests/incremental_test.py index d1eebd41920f..16747e438de0 100644 --- a/airbyte-integrations/connectors/source-fauna/unit_tests/incremental_test.py +++ b/airbyte-integrations/connectors/source-fauna/unit_tests/incremental_test.py @@ -56,6 +56,9 @@ def state(data: dict[str, any]) -> AirbyteMessage: # Tests to make sure the read() function handles the various config combinations of # updates/deletions correctly. def test_read_no_updates_or_creates_but_removes_present(): + def find_index_for_stream(collection: str) -> str: + return "ts" + def read_updates_hardcoded( logger, stream: ConfiguredAirbyteStream, conf: CollectionConfig, state: Dict[str, any], index: str, page_size: int ) -> Generator[any, None, None]: @@ -82,6 +85,7 @@ def read_removes_hardcoded( source = SourceFauna() source._setup_client = Mock() source.read_all = Mock() + source.find_index_for_stream = find_index_for_stream source.read_updates = read_updates_hardcoded source.read_removes = read_removes_hardcoded source.client = MagicMock() @@ -155,6 +159,9 @@ def read_removes_hardcoded( def test_read_updates_ignore_deletes(): was_called = False + def find_index_for_stream(collection: str) -> str: + return "my_stream_name_ts" + def read_updates_hardcoded( logger, stream: ConfiguredAirbyteStream, conf, state: dict[str, any], index: str, page_size: int ) -> Generator[any, None, None]: @@ -190,6 +197,7 @@ def read_removes_hardcoded( source = SourceFauna() source._setup_client = Mock() source.read_all = Mock() + source.find_index_for_stream = find_index_for_stream source.read_updates = read_updates_hardcoded source.read_removes = read_removes_hardcoded source.client = MagicMock() @@ -204,7 +212,6 @@ def read_removes_hardcoded( { "collection": { "name": "my_stream_name", - "index": "my_stream_name_ts", "deletions": { "deletion_mode": "ignore", }, @@ -340,6 +347,9 @@ def make_query(after): failed_yet = False + def find_index_for_stream(collection: str) -> str: + return "foo_ts" + def query_hardcoded(expr): nonlocal current_query nonlocal failed_yet @@ -354,6 +364,7 @@ def query_hardcoded(expr): source = SourceFauna() source._setup_client = Mock() source.client = MagicMock() + source.find_index_for_stream = find_index_for_stream source.client.query = query_hardcoded logger = mock_logger() @@ -362,7 +373,7 @@ def query_hardcoded(expr): # ts should be "now", which is whatever we want # ref must not be present, as we are not resuming state = {} - config = CollectionConfig(index="foo_ts", page_size=PAGE_SIZE) + config = CollectionConfig(page_size=PAGE_SIZE) outputs = [] try: for output in source.read_removes(logger, stream, config, state, deletion_column="deletes_here"): @@ -490,6 +501,9 @@ def make_query(after): ), ] + def find_index_for_stream(collection: str) -> str: + return "foo_ts" + def query_hardcoded(expr): nonlocal current_query assert expr == QUERIES[current_query] @@ -500,6 +514,7 @@ def query_hardcoded(expr): source = SourceFauna() source._setup_client = Mock() source.client = MagicMock() + source.find_index_for_stream = find_index_for_stream source.client.query = query_hardcoded logger = mock_logger() @@ -508,7 +523,7 @@ def query_hardcoded(expr): # ts should be "now", which is whatever we want # ref must not be present, as we are not resuming state = {} - config = CollectionConfig(index="foo_ts", page_size=PAGE_SIZE) + config = CollectionConfig(page_size=PAGE_SIZE) outputs = list(source.read_removes(logger, stream, config, state, deletion_column="my_deleted_column")) # We should get the first document assert outputs == [ @@ -653,6 +668,7 @@ def query_hardcoded(expr): source = SourceFauna() source._setup_client = Mock() source.client = MagicMock() + source.find_index_for_stream = Mock() source.client.query = query_hardcoded source.find_emitted_at = Mock(return_value=NOW) @@ -669,7 +685,7 @@ def query_hardcoded(expr): json_schema={}, ), ), - CollectionConfig(index=INDEX, page_size=PAGE_SIZE), + CollectionConfig(page_size=PAGE_SIZE), state=state, index=INDEX, page_size=PAGE_SIZE, @@ -705,7 +721,7 @@ def query_hardcoded(expr): json_schema={}, ), ), - CollectionConfig(index=INDEX, page_size=PAGE_SIZE), + CollectionConfig(page_size=PAGE_SIZE), state=state, index=INDEX, page_size=PAGE_SIZE, @@ -727,7 +743,7 @@ def query_hardcoded(expr): json_schema={}, ), ), - CollectionConfig(index=INDEX, page_size=PAGE_SIZE), + CollectionConfig(page_size=PAGE_SIZE), state=state, index=INDEX, page_size=PAGE_SIZE, @@ -738,6 +754,7 @@ def query_hardcoded(expr): assert state == {"ref": "11", "ts": 1000} assert not source._setup_client.called + assert not source.find_index_for_stream.called assert not logger.error.called assert current_query == 5 @@ -819,6 +836,7 @@ def query_hardcoded(expr): source = SourceFauna() source._setup_client = Mock() source.client = MagicMock() + source.find_index_for_stream = Mock() source.client.query = query_hardcoded source.find_emitted_at = Mock(return_value=NOW) @@ -838,7 +856,7 @@ def query_hardcoded(expr): json_schema={}, ), ), - CollectionConfig(index=INDEX, page_size=PAGE_SIZE), + CollectionConfig(page_size=PAGE_SIZE), state=state, index=INDEX, page_size=PAGE_SIZE, @@ -869,7 +887,7 @@ def query_hardcoded(expr): json_schema={}, ), ), - CollectionConfig(index=INDEX, page_size=PAGE_SIZE), + CollectionConfig(page_size=PAGE_SIZE), state=state, index=INDEX, page_size=PAGE_SIZE, @@ -889,5 +907,6 @@ def query_hardcoded(expr): assert state["ref"] == "10" # This is set after we finish reading assert "after" not in state # This is some after token, serialized to json assert not source._setup_client.called + assert not source.find_index_for_stream.called assert not logger.error.called assert current_query == 3 diff --git a/airbyte-integrations/connectors/source-fauna/unit_tests/test_util.py b/airbyte-integrations/connectors/source-fauna/unit_tests/test_util.py index 85bdc9ffaffb..f660ff55a8f0 100644 --- a/airbyte-integrations/connectors/source-fauna/unit_tests/test_util.py +++ b/airbyte-integrations/connectors/source-fauna/unit_tests/test_util.py @@ -2,7 +2,6 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import List from unittest.mock import Mock from faunadb import query as q @@ -40,31 +39,13 @@ def deleted_field(column: str) -> "DeletionsConfig": return DeletionsConfig(mode="deleted_field", column=column) -class Column: - def __init__(self, name: str, path: List[str], type: str, required: bool, format="", airbyte_type=""): - self.name = name - self.path = path - self.type = type - self.required = required - self.format = format - self.airbyte_type = airbyte_type - - class CollectionConfig: def __init__( self, - index="", - deletions=DeletionsConfig.ignore(), - name="", - data_column=True, - additional_columns=[], page_size=64, + deletions=DeletionsConfig.ignore(), ): - self.name = name - self.data_column = data_column - self.additional_columns = additional_columns self.page_size = page_size - self.index = index self.deletions = deletions @@ -110,10 +91,7 @@ def config(extra: dict[str, any]) -> dict[str, any]: "scheme": "http", "secret": "secret", "collection": { - "name": "foo", - "data_column": True, "page_size": 64, - "index": "ts", "deletions": {"deletion_mode": "ignore"}, }, } @@ -129,6 +107,7 @@ def expand_columns_query(ref): { "ref": q.select(["ref", "id"], doc), "ts": q.select("ts", doc), - "data": q.select("data", doc), + "data": q.select("data", doc, {}), + "ttl": q.select("ttl", doc, None), }, ) diff --git a/docs/integrations/sources/fauna.md b/docs/integrations/sources/fauna.md index 358c6aa03bbb..4e502b35b6e4 100644 --- a/docs/integrations/sources/fauna.md +++ b/docs/integrations/sources/fauna.md @@ -13,8 +13,8 @@ You need to create a separate source per collection that you want to export. ## Preliminary setup -1. Choose the Fauna collection you want to export and enter that in the "Collection" field on the left. -2. Enter the domain of the collection's database that you are exporting. The URL can be found in [the docs](https://docs.fauna.com/fauna/current/learn/understanding/region_groups#how-to-use-region-groups). +Enter the domain of the collection's database that you are exporting. The URL can be found in +[the docs](https://docs.fauna.com/fauna/current/learn/understanding/region_groups#how-to-use-region-groups). ## Full sync @@ -24,14 +24,49 @@ Follow these steps if you want this connection to perform a full sync. ```javascript CreateRole({ name: "airbyte-readonly", - privileges: [{ - resource: Collection("COLLECTION_NAME"), - actions: { read: true } - }], + privileges: [ + { + resource: Collections(), + actions: { read: true } + }, + { + resource: Indexes(), + actions: { read: true } + }, + { + resource: Collection("COLLECTION_NAME"), + actions: { read: true } + } + ], }) ``` -Replace `COLLECTION_NAME` with the name of the collection configured for this connector. +Replace `COLLECTION_NAME` with the name of the collection configured for this connector. If you'd like to sync +multiple collections, add an entry for each additional collection you'd like to sync. For example, to sync +`users` and `products`, run this query instead: +```javascript +CreateRole({ + name: "airbyte-readonly", + privileges: [ + { + resource: Collections(), + actions: { read: true } + }, + { + resource: Indexes(), + actions: { read: true } + }, + { + resource: Collection("users"), + actions: { read: true } + }, + { + resource: Collection("products"), + actions: { read: true } + } + ], +}) +``` 2. Create a key with that role. You can create a key using this query: ```javascript @@ -63,11 +98,21 @@ CreateIndex({ Replace `COLLECTION_NAME` with the name of the collection configured for this connector. Replace `INDEX_NAME` with the name that you configured for the Incremental Sync Index. +Repeat this step for every collection you'd like to sync. + 2. Create a role that can read the collection, the index, and the metadata of all indexes. It needs access to index metadata in order to validate the index settings. You can create the role with this query: ```javascript CreateRole({ name: "airbyte-readonly", privileges: [ + { + resource: Collections(), + actions: { read: true } + }, + { + resource: Indexes(), + actions: { read: true } + }, { resource: Collection("COLLECTION_NAME"), actions: { read: true } @@ -75,17 +120,49 @@ CreateRole({ { resource: Index("INDEX_NAME"), actions: { read: true } + } + ], +}) +``` + +Replace `COLLECTION_NAME` with the name of the collection configured for this connector. +Replace `INDEX_NAME` with the name that you configured for the Incremental Sync Index. + +If you'd like to sync multiple collections, add an entry for every collection and index +you'd like to sync. For example, to sync `users` and `products` with Incremental Sync, run +the following query: +```javascript +CreateRole({ + name: "airbyte-readonly", + privileges: [ + { + resource: Collections(), + actions: { read: true } }, { resource: Indexes(), actions: { read: true } + }, + { + resource: Collection("users"), + actions: { read: true } + }, + { + resource: Index("users-ts"), + actions: { read: true } + }, + { + resource: Collection("products"), + actions: { read: true } + }, + { + resource: Index("products-ts"), + actions: { read: true } } ], }) ``` -Replace `COLLECTION_NAME` with the name of the collection configured for this connector. -Replace `INDEX_NAME` with the name that you configured for the Incremental Sync Index. 3. Create a key with that role. You can create a key using this query: ```javascript