Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: planet scale working #45

Merged
merged 26 commits into from
Feb 23, 2024
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a08e4b3
WIP Discovery working, need to get tables working, and then refactor
visch Feb 16, 2024
7359da5
Working with access issues fitlered out
visch Feb 17, 2024
248be32
Works with 100k+ tables
visch Feb 17, 2024
1b155d9
Fix decimals
visch Feb 17, 2024
49f125a
Dynamic PlanetScale detection
visch Feb 17, 2024
a1a0580
Readme updated, planet scale working
visch Feb 17, 2024
f06bffd
Swap config name for sqlalchemy options
visch Feb 19, 2024
6c90cfb
Add is_vitess configuration
visch Feb 20, 2024
685a2e9
Merge main's poetry.lock file in
visch Feb 20, 2024
84fc4b3
Make linter happy
visch Feb 21, 2024
8c2d124
Fix config validation, and messup with connect function call
visch Feb 21, 2024
8809b65
Passes all tests, squashed some bugs!
visch Feb 21, 2024
7644319
make mypy happy
visch Feb 21, 2024
0b07d39
PlanetScale tap pointer
visch Feb 21, 2024
c84d384
fix merge conflicts
visch Feb 22, 2024
f7520c9
Fix sqlalchemy_options documentation
visch Feb 22, 2024
6c0e974
Fix vitess config check
visch Feb 22, 2024
fa8ac91
Update README.md
visch Feb 22, 2024
7e2a11b
Update README.md
visch Feb 22, 2024
3ac1ceb
Apply Edgars suggestions
visch Feb 22, 2024
d986bba
Merge branch 'planet_scale' of github.com:MeltanoLabs/tap-mysql into …
visch Feb 22, 2024
164b94f
Match tap.py config docs with README
visch Feb 22, 2024
6c40462
A bit cleaner is_vitess check
visch Feb 22, 2024
1b34310
config key doesn't get set when value is None. I didn't expect that!
visch Feb 22, 2024
563e229
Merge branch 'main' into planet_scale
edgarrmondragon Feb 23, 2024
76b0b11
Merge branch 'main' into planet_scale
visch Feb 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
WIP Discovery working, need to get tables working, and then refactor
  • Loading branch information
visch committed Feb 16, 2024
commit a08e4b3a472c8eb75225efc2ae4e1ff2de392310
157 changes: 157 additions & 0 deletions tap_mysql/client.py
Original file line number Diff line number Diff line change
@@ -2,12 +2,14 @@
from __future__ import annotations

import datetime
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Iterable

import singer_sdk.helpers._typing
import sqlalchemy
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.helpers._typing import TypeConformanceLevel

if TYPE_CHECKING:
@@ -40,6 +42,30 @@ def patched_conform(

class MySQLConnector(SQLConnector):
"""Connects to the MySQL SQL source."""

def create_engine(self) -> Engine:
"""Creates and returns a new engine. Do not call outside of _engine.

NOTE: Do not call this method. The only place that this method should
be called is inside the self._engine method. If you'd like to access
the engine on a connector, use self._engine.

This method exists solely so that tap/target developers can override it
on their subclass of SQLConnector to perform custom engine creation
logic.

Returns:
A new SQLAlchemy Engine.
"""
#pymyssql
connect_args = {
"ssl": {
"verify_identity": True,
"verify_cert": True,
"ca": "/etc/ssl/certs/ca-certificates.crt"
}
}
return sqlalchemy.create_engine(self.sqlalchemy_url, connect_args=connect_args, echo=False)

@staticmethod
def to_jsonschema_type(
@@ -169,6 +195,137 @@ def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
if "filter_schemas" in self.config and len(self.config["filter_schemas"]) != 0:
return self.config["filter_schemas"]
return super().get_schema_names(engine, inspected)

def discover_catalog_entry(
self,
engine: Engine, # noqa: ARG002
inspected: Inspector,
schema_name: str,
table_name: str,
is_view: bool, # noqa: FBT001
) -> CatalogEntry:
"""Overrode to support Vitess as DESCRIBE is not supported for views.

Create `CatalogEntry` object for the given table or a view.

Args:
engine: SQLAlchemy engine
inspected: SQLAlchemy inspector instance for engine
schema_name: Schema name to inspect
table_name: Name of the table or a view
is_view: Flag whether this object is a view, returned by `get_object_names`

Returns:
`CatalogEntry` object for the given table or a view
"""
# Initialize unique stream name
unique_stream_id = self.get_fully_qualified_name(
db_name=None,
schema_name=schema_name,
table_name=table_name,
delimiter="-",
)

# Detect key properties
possible_primary_keys: list[list[str]] = []
#pk_def = inspected.get_pk_constraint(table_name, schema=schema_name)
pk_def = False
if pk_def and "constrained_columns" in pk_def:
possible_primary_keys.append(pk_def["constrained_columns"])

# An element of the columns list is ``None`` if it's an expression and is
# returned in the ``expressions`` list of the reflected index.
possible_primary_keys.extend(
index_def["column_names"] # type: ignore[misc]
for index_def in [] #inspected.get_indexes(table_name, schema=schema_name)
if index_def.get("unique", False)
)

key_properties = next(iter(possible_primary_keys), None)

# Initialize columns list
table_schema = th.PropertiesList()
with self._connect() as conn:
columns = conn.execute(f"SHOW columns from `{schema_name}`.`{table_name}`")
for column in columns:
column_name = column["Field"]
is_nullable = column["Null"] == "YES"
jsonschema_type: dict = self.to_jsonschema_type(column["Type"])
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
required=not is_nullable,
),
)
schema = table_schema.to_dict()

# Initialize available replication methods
addl_replication_methods: list[str] = [""] # By default an empty list.
# Notes regarding replication methods:
# - 'INCREMENTAL' replication must be enabled by the user by specifying
# a replication_key value.
# - 'LOG_BASED' replication must be enabled by the developer, according
# to source-specific implementation capabilities.
replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods]))

# Create the catalog entry object
return CatalogEntry(
tap_stream_id=unique_stream_id,
stream=unique_stream_id,
table=table_name,
key_properties=key_properties,
schema=Schema.from_dict(schema),
is_view=is_view,
replication_method=replication_method,
metadata=MetadataMapping.get_standard_metadata(
schema_name=schema_name,
schema=schema,
replication_method=replication_method,
key_properties=key_properties,
valid_replication_keys=None, # Must be defined by user
),
database=None, # Expects single-database context
row_count=None,
stream_alias=None,
replication_key=None, # Must be defined by user
)

def get_table_columns(
self,
full_table_name: str,
column_names: list[str] | None = None,
) -> dict[str, sqlalchemy.Column]:
"""Overrode to support Vitess as DESCRIBE is not supported for views.

Return a list of table columns.

Args:
full_table_name: Fully qualified table name.
column_names: A list of column names to filter to.

Returns:
An ordered list of column objects.
"""
if full_table_name not in self._table_cols_cache:
_, schema_name, table_name = self.parse_full_table_name(full_table_name)
with self._connect() as conn:
columns = conn.execute(f"SHOW columns from `{schema_name}`.`{table_name}`")
breakpoint()

self._table_cols_cache[full_table_name] = {
col_meta["Field"]: sqlalchemy.Column(
col_meta["Field"],
col_meta["Type"],
nullable=col_meta["Null"] == "YES"
)
for col_meta in columns
if not column_names
or col_meta["Field"].casefold()
in {col.casefold() for col in column_names}
}

return self._table_cols_cache[full_table_name]


class MySQLStream(SQLStream):
Loading