Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New Destination: Weaviate #20094

Merged
merged 64 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
28c0395
Add Weaviate Destination #20012
samos123 Dec 2, 2022
77431fa
Fix formatting and standards
samos123 Dec 5, 2022
7b159bc
Fix flake issue
samos123 Dec 5, 2022
9e56857
Fix unused client variable
samos123 Dec 5, 2022
228c7c5
Add support for int based ID fields
samos123 Dec 7, 2022
875be1b
Ensure stream name meets Weaviate class reqs
samos123 Dec 8, 2022
8c9ad5b
add integration test for using pokemon as source
samos123 Dec 8, 2022
5be647f
handle nested objects by converting to json string
samos123 Dec 8, 2022
1de0ca2
create schema for transforming data to weaviate
samos123 Dec 9, 2022
617ecf5
Add docs for weaviate destination
samos123 Dec 9, 2022
c8fd137
Remove pokemon-schema external dependency
samos123 Dec 9, 2022
1845194
Remove pikachu integration test external dep
samos123 Dec 9, 2022
169025b
Add large batch test case
samos123 Dec 13, 2022
2f440ae
add test for second sync
samos123 Dec 13, 2022
a6b35e3
Merge branch 'master' into add-weaviate-destination
itaseskii Dec 13, 2022
3a898bf
Fix issue with fields starting with uppercase
samos123 Dec 14, 2022
88f6e61
Merge branch 'add-weaviate-destination'
samos123 Dec 14, 2022
c246173
add more checks to line_break test
samos123 Dec 14, 2022
d1f76d2
Update README for Weaviate
samos123 Dec 14, 2022
57e4432
Make batch_size configurable with 100 as default
samos123 Dec 14, 2022
ebabd2f
Merge branch 'master' into add-weaviate-destination
samos123 Dec 14, 2022
c9e5815
Merge master into add-weaviate-destination
samos123 Dec 14, 2022
44d5ee0
Add support for providing vectors
samos123 Dec 14, 2022
54d08c8
Update docs
samos123 Dec 14, 2022
5a935b8
Add test for existing Weaviate class
samos123 Dec 14, 2022
4c1a46e
Add trying to create schema in check connection
samos123 Dec 14, 2022
1472625
Merge branch 'master' into add-weaviate-destination
samos123 Dec 14, 2022
5c79c5a
Merge branch 'master' into add-weaviate-destination
samos123 Dec 15, 2022
2da6170
Add support for mongodb _id fields
samos123 Dec 15, 2022
1915b6e
Add support for providing custom ID
samos123 Dec 16, 2022
9173d23
remove unused file
samos123 Dec 16, 2022
28f3649
fix flow of is_ready() check
samos123 Dec 16, 2022
f190fc9
Move standalone functions to utils.py
samos123 Dec 16, 2022
f0618e6
Support overwrite mode
samos123 Dec 16, 2022
ac61e59
Merge branch 'master' into add-weaviate-destination
itaseskii Dec 16, 2022
46d9525
Add regex based stream_name_class_name conversion
samos123 Dec 17, 2022
c53714f
Merge branch 'master' into add-weaviate-destination
samos123 Dec 17, 2022
a5d8d23
Merge branch 'master' into add-weaviate-destination
itaseskii Dec 19, 2022
b2d1fa3
remove unneeded print statement
samos123 Dec 19, 2022
69f9df0
Merge branch 'master' into add-weaviate-destination
itaseskii Dec 19, 2022
85aeae8
Add "airbyte_secret" : true to password config
samos123 Dec 19, 2022
8700d47
Merge branch remote into local
samos123 Dec 19, 2022
83b3a09
add support for array of arrays
samos123 Dec 19, 2022
0d4e230
remove unneeded variable declaration
samos123 Dec 20, 2022
c71e3be
change to MutableMapping since we use del
samos123 Dec 20, 2022
da2533d
change name from queued_write to buffered_write
samos123 Dec 20, 2022
f7bbdcf
Merge branch 'master' into add-weaviate-destination
itaseskii Dec 21, 2022
53e84a9
Merge branch 'master' into add-weaviate-destination
itaseskii Dec 21, 2022
0207834
add retry on partial batch error
samos123 Dec 21, 2022
6cd4503
Merge branch 'add-weaviate-destination'
samos123 Dec 21, 2022
dc1e7a9
Fix partial batch retry and add tests
samos123 Dec 21, 2022
08ab623
Merge branch 'master' into add-weaviate-destination
itaseskii Jan 9, 2023
527560a
Merge branch 'master' into add-weaviate-destination
itaseskii Jan 9, 2023
a8e3047
Merge branch 'master' into add-weaviate-destination
Jan 9, 2023
19541e9
fix ID generation
samos123 Jan 9, 2023
d75438b
Merge remote-tracking branch 'upstream/master'
samos123 Jan 9, 2023
1befb54
Clean up recursive retry logic
samos123 Jan 10, 2023
6f8e6a1
Merge branch 'master' into add-weaviate-destination
itaseskii Jan 11, 2023
2649a97
Merge branch 'master' into add-weaviate-destination
itaseskii Jan 11, 2023
c421e34
Merge branch 'master' into add-weaviate-destination
itaseskii Jan 11, 2023
d577f24
fix flake tests
samos123 Jan 11, 2023
cce55e6
ran flake reformat
samos123 Jan 11, 2023
a10a1af
add definitions
Jan 11, 2023
c0e8598
Merge branch 'master' into add-weaviate-destination
itaseskii Jan 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*
!Dockerfile
!main.py
!destination_weaviate
!setup.py
38 changes: 38 additions & 0 deletions airbyte-integrations/connectors/destination-weaviate/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
FROM python:3.9.11-alpine3.15 as base

# build and load all requirements
FROM base as builder
WORKDIR /airbyte/integration_code

# upgrade pip to the latest version
RUN apk --no-cache upgrade \
&& pip install --upgrade pip \
&& apk --no-cache add tzdata build-base


COPY setup.py ./
# install necessary packages to a temporary folder
RUN pip install --prefix=/install .

# build a clean environment
FROM base
WORKDIR /airbyte/integration_code

# copy all loaded and built libraries to a pure basic image
COPY --from=builder /install /usr/local
# add default timezone settings
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
RUN echo "Etc/UTC" > /etc/timezone

# bash is installed for more convenient debugging.
RUN apk --no-cache add bash

# copy payload code only
COPY main.py ./
COPY destination_weaviate ./destination_weaviate

ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-weaviate
123 changes: 123 additions & 0 deletions airbyte-integrations/connectors/destination-weaviate/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Weaviate Destination

This is the repository for the Weaviate destination connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/destinations/weaviate).

## Local development

### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**

#### Minimum Python version required `= 3.7.0`

#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
```

This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.

Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.

#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:destination-weaviate:build
```

#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/destinations/weaviate)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_weaviate/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.

**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination weaviate test creds`
and place them into `secrets/config.json`.

### Locally running the connector
```
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```

### Locally running the connector docker image

#### Build
First, make sure you build the latest Docker image:
```
docker build . -t airbyte/destination-weaviate:dev
```

You can also build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:destination-weaviate:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.

#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-weaviate:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-weaviate:dev check --config /secrets/config.json
# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-weaviate:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
```
### Unit Tests
To run unit tests locally, from the connector directory run:
```
python -m pytest unit_tests
```

### Integration Tests
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector).
#### Custom Integration tests
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
```
python -m pytest integration_tests
```
#### Acceptance Tests
Coming soon:

### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:destination-weaviate:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:destination-weaviate:integrationTest
```

## Dependency Management
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
We split dependencies between two groups, dependencies that are:
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
* required for the testing need to go to `TEST_REQUIREMENTS` list

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
plugins {
id 'airbyte-python'
id 'airbyte-docker'
}

airbytePython {
moduleDirectory 'destination_weaviate'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


from .destination import DestinationWeaviate

__all__ = ["DestinationWeaviate"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import uuid
import logging
import json
from dataclasses import dataclass
import time
from typing import Any, Mapping, MutableMapping, List

import weaviate
from .utils import generate_id, parse_id_schema, parse_vectors, stream_to_class_name


@dataclass
class BufferedObject:
id: str
properties: Mapping[str, Any]
vector: List[Any]
class_name: str


class WeaviatePartialBatchError(Exception):
pass


class Client:
def __init__(self, config: Mapping[str, Any], schema: Mapping[str, str]):
self.client = self.get_weaviate_client(config)
self.config = config
self.batch_size = int(config.get("batch_size", 100))
self.schema = schema
self.vectors = parse_vectors(config.get("vectors"))
self.id_schema = parse_id_schema(config.get("id_schema"))
self.buffered_objects: MutableMapping[str, BufferedObject] = {}
self.objects_with_error: MutableMapping[str, BufferedObject] = {}

def buffered_write_operation(self, stream_name: str, record: MutableMapping):
if self.id_schema.get(stream_name, "") in record:
id_field_name = self.id_schema.get(stream_name, "")
record_id = generate_id(record.get(id_field_name))
del record[id_field_name]
itaseskii marked this conversation as resolved.
Show resolved Hide resolved
else:
if "id" in record:
record_id = generate_id(record.get("id"))
del record["id"]
# Weaviate will throw an error if you try to store a field with name _id
elif "_id" in record:
record_id = generate_id(record.get("_id"))
del record["_id"]
else:
record_id = uuid.uuid4()
record_id = str(record_id)

# TODO support nested objects instead of converting to json string when weaviate supports this
for k, v in record.items():
if self.schema[stream_name].get(k, "") == "jsonify":
record[k] = json.dumps(v)
# Handling of empty list that's not part of defined schema otherwise Weaviate throws invalid string property
if isinstance(v, list) and len(v) == 0 and k not in self.schema[stream_name]:
record[k] = ""

# Property names in Weaviate have to start with lowercase letter
record = {k[0].lower() + k[1:]: v for k, v in record.items()}
vector = None
if stream_name in self.vectors:
vector_column_name = self.vectors.get(stream_name)
vector = record.get(vector_column_name)
del record[vector_column_name]
class_name = stream_to_class_name(stream_name)
self.client.batch.add_data_object(record, class_name, record_id, vector=vector)
self.buffered_objects[record_id] = BufferedObject(record_id, record, vector, class_name)
if self.client.batch.num_objects() >= self.batch_size:
self.flush()

def flush(self, retries: int = 3):
if len(self.objects_with_error) > 0 and retries == 0:
error_msg = f"Objects had errors and retries failed as well. Object IDs: {self.objects_with_error.keys}"
raise WeaviatePartialBatchError(error_msg)

results = self.client.batch.create_objects()
self.objects_with_error.clear()
for result in results:
errors = result.get("result", {}).get("errors", [])
if errors:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the replication should continue if the connector fails to replicate records. you should throw an exception and terminate the replication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the case would be 1 of the objects in the batch had an error however other records might not have an error. So I was thinking we should continue but simply log an error for those. I was planning to add a retry mechanism in a follow up PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm aware that only a small amount of records can fail and the rest can be replicated successfully but so far the convention in the rest of the destination connectors was to terminate the replication and execute any cleanup logic such as removing temp tables and similar. tbh I'm on the fence for this, can you count a replication as successful overall if records were partially replicated 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented partial batch retry and will now throw an exception if after 3 retries the there are still partial errors. Please review.

obj_id = result.get("id")
self.objects_with_error[obj_id] = self.buffered_objects.get(obj_id)
logging.info(f"Object {obj_id} had errors: {errors}. Going to retry.")

for buffered_object in self.objects_with_error.values():
self.client.batch.add_data_object(buffered_object.properties, buffered_object.class_name, buffered_object.id,
buffered_object.vector)

if len(self.objects_with_error) > 0 and retries > 0:
logging.info("sleeping 2 seconds before retrying batch again")
time.sleep(2)
self.flush(retries - 1)

self.buffered_objects.clear()

def delete_stream_entries(self, stream_name: str):
class_name = stream_to_class_name(stream_name)
try:
original_schema = self.client.schema.get(class_name=class_name)
self.client.schema.delete_class(class_name=class_name)
logging.info(f"Deleted class {class_name}")
self.client.schema.create_class(original_schema)
logging.info(f"Recreated class {class_name}")
except weaviate.exceptions.UnexpectedStatusCodeException as e:
if e.message.startswith("Get schema! Unexpected status code: 404"):
itaseskii marked this conversation as resolved.
Show resolved Hide resolved
logging.info(f"Class {class_name} did not exist.")
else:
raise e

@staticmethod
def get_weaviate_client(config: Mapping[str, Any]) -> weaviate.Client:
url, username, password = config.get("url"), config.get("username"), config.get("password")

if username and not password:
raise Exception("Password is required when username is set")
if password and not username:
raise Exception("Username is required when password is set")

if username and password:
credentials = weaviate.auth.AuthClientPassword(username, password)
return weaviate.Client(url=url, auth_client_secret=credentials)
return weaviate.Client(url=url, timeout_config=(2, 2))
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import string
from typing import Any, Iterable, Mapping
import random

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type

from .client import Client
from .utils import get_schema_from_catalog


class DestinationWeaviate(Destination):
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
"""
Reads the input stream of messages, config, and catalog to write data to the destination.

This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received
in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been
successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing,
then the source is given the last state message output from this method as the starting point of the next sync.

:param config: dict of JSON configuration matching the configuration declared in spec.json
:param configured_catalog: The Configured Catalog describing the schema of the data being received and how it should be persisted in the
destination
:param input_messages: The stream of input messages received from the source
:return: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs
"""
client = Client(config, get_schema_from_catalog(configured_catalog))
for configured_stream in configured_catalog.streams:
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
client.delete_stream_entries(configured_stream.stream.name)

for message in input_messages:
if message.type == Type.STATE:
# Emitting a state message indicates that all records which came before it have been written to the destination. So we flush
# the queue to ensure writes happen, then output the state message to indicate it's safe to checkpoint state
client.flush()
yield message
elif message.type == Type.RECORD:
record = message.record
client.buffered_write_operation(record.stream, record.data)
else:
# ignore other message types for now
continue

# Make sure to flush any records still in the queue
client.flush()

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
e.g: if a provided API token or password can be used to connect and write to the destination.

:param logger: Logging object to display debug/info/error to the logs
(logs will not be accessible via airbyte UI if they are not passed to this logger)
:param config: Json object containing the configuration of this destination, content of this json is as specified in
the properties of the spec.json file

:return: AirbyteConnectionStatus indicating a Success or Failure
"""
try:
client = Client.get_weaviate_client(config)
itaseskii marked this conversation as resolved.
Show resolved Hide resolved
ready = client.is_ready()
if not ready:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"Weaviate server {config.get('url')} not ready")

class_name = ''.join(random.choices(string.ascii_uppercase, k=10))
client.schema.create_class({"class": class_name})
itaseskii marked this conversation as resolved.
Show resolved Hide resolved
client.schema.delete_class(class_name)
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")
Loading