Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add DynamoDB Source #71

Merged
merged 43 commits into from
Dec 19, 2024
Merged

Feat: add DynamoDB Source #71

merged 43 commits into from
Dec 19, 2024

Conversation

turtleDev
Copy link
Contributor

@turtleDev turtleDev commented Dec 16, 2024

  • Adds dynamodb source
  • fixes credentials escapes during url parsing in s3
  • refactors source and destinations lists in SourceDestinationFactory

@@ -93,6 +93,44 @@ def parse_scheme_from_uri(uri: str) -> str:
class SourceDestinationFactory:
source_scheme: str
destination_scheme: str
sources: Dict[str, Type[SourceProtocol]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a good example of an implementation that could have been in another PR, since this has nothing to do with the current PR. I'd suggest keeping these out for the future

)

incremental = None
incremental_key = kwargs.get("incremental_key", "").strip()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might wanna make the default None here as well, otherwise this could change the behavior

ingestr/src/sources.py Show resolved Hide resolved
docs/supported-sources/dynamodb.md Show resolved Hide resolved

To obtain the access keys, use the IAM console on AWS. See [IAM Documentation](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) for more information.

### Example
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add an incremental example?

Comment on lines +1654 to +1656
# connection pooling causes issues with duckdb, when the connection
# is reused below, so we disable pooling.
dest_engine = sqlalchemy.create_engine(dest_uri, poolclass=NullPool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pretty clever, I had to do an uglier solution before, didn't know I could do this 👍

assert rows[i][1] == pendulum.parse(dynamodb.data[i]["updated_at"])

# ingest the rest
result = invoke_ingest_command(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not run another attempt once more, similar to the above, to check the data? also, I'd suggest having some tests for other strategies like append or delete insert.

@turtleDev turtleDev merged commit 86e33ee into main Dec 19, 2024
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants