Skip to content

Commit

Permalink
feat: standardize fasta, multiple column primary key, fix
Browse files Browse the repository at this point in the history
`polars_write_database` for list[number] types
  • Loading branch information
kiyoon committed Aug 6, 2024
1 parent 832daa8 commit 2459171
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 9 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,23 @@ This package also provides some useful functions to work with PostgreSQL, SMILES
from bio_data_to_db.utils.postgresql import (
create_db_if_not_exists,
create_schema_if_not_exists,
set_column_as_primary_key,
make_int_column_primary_key_identity,
make_columns_primary_key,
make_columns_unique,
make_large_columns_unique,
split_column_str_to_list,
polars_write_database,
polars_write_database, # addressed issues with list columns
)

from bio_data_to_db.utils.smiles import (
canonical_smiles_wo_salt,
polars_canonical_smiles_wo_salt,
)

from bio_data_to_db.utils.fasta import (
polars_standardize_fasta,
)

from bio_data_to_db.utils.polars import (
w_pbar,
)
Expand Down
2 changes: 1 addition & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions src/bio_data_to_db/utils/fasta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import polars as pl


def polars_standardize_fasta(
df: pl.DataFrame, fasta_col: str = "fasta", out_col: str = "fasta"
) -> pl.DataFrame:
"""
Remove spaces and make it uppercase of a Polars column.
"""
df = df.with_columns(
pl.col(fasta_col)
.str.to_uppercase()
.str.replace_all("\n", "")
.str.replace_all(" ", "")
.alias(out_col)
)

return df
64 changes: 58 additions & 6 deletions src/bio_data_to_db/utils/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def create_schema_if_not_exists(uri: str, schema_name: str, comment: str | None
logger.exception(f"Error creating schema '{schema_name}' in DB '{db_name}'")


def set_column_as_primary_key(
def make_int_column_primary_key_identity(
uri: str,
*,
schema_name: str = "public",
Expand All @@ -173,6 +173,7 @@ def set_column_as_primary_key(
Make an existing index column (integer type) as primary key with auto increment (identity).
This is used because pl.DataFrame.write_database() doesn't support writing index column as primary key.
Also, it will automatically set the start value of auto increment to the max value in the column.
Example:
>>> df = pl.DataFrame({"smiles": ["CCO", "CCN", "CCC"]}) # doctest: +SKIP
Expand Down Expand Up @@ -227,6 +228,46 @@ def set_column_as_primary_key(
)


def make_columns_primary_key(
uri: str,
*,
schema_name: str = "public",
table_name: str,
column_names: str | Sequence[str],
):
"""
Make multiple columns as primary key but without auto increment (identity).
This is similar to make_columns_unique() but with primary key constraint.
"""
with psycopg.connect(
conninfo=uri,
) as conn:
try:
cursor = conn.cursor()

if isinstance(column_names, str):
column_names = [column_names]

cursor.execute(
sql.SQL("""
ALTER TABLE {table}
ADD PRIMARY KEY ({columns});
""").format(
table=sql.Identifier(schema_name, table_name),
columns=sql.SQL(",").join(
sql.Identifier(col) for col in column_names
),
)
)
conn.commit()

except psycopg.Error:
logger.exception(
f"Error setting primary key for column '{column_names}' in table '{table_name}'"
)


def make_columns_unique(
uri: str,
*,
Expand All @@ -250,9 +291,9 @@ def make_columns_unique(

cursor.execute(
query=sql.SQL("""
ALTER TABLE {table}
ADD CONSTRAINT {table_unique_constraint}
UNIQUE ({columns});
ALTER TABLE {table}
ADD CONSTRAINT {table_unique_constraint}
UNIQUE ({columns});
""").format(
table=sql.Identifier(schema_name, table_name),
table_unique_constraint=sql.Identifier(
Expand All @@ -267,7 +308,7 @@ def make_columns_unique(

except psycopg.Error:
logger.exception(
f"Error setting primary key for column '{column_names}' in table '{table_name}'"
f"Error setting unique constraint for column '{column_names}' in table '{table_name}'"
)


Expand Down Expand Up @@ -479,7 +520,18 @@ def polars_write_database(
for col, dtype in columns_dtype.items()
}

df.to_pandas(use_pyarrow_extension_array=True).to_sql(
pd_df = df.to_pandas(use_pyarrow_extension_array=True)

# If any column has type list[number] in Polars, the pandas DataFrame will have a numpy array.
# We need to convert it to a list, because `to_sql` doesn't support numpy arrays.
for col, dtype in columns_dtype.items():
if isinstance(dtype, pl.List):
if isinstance(dtype.inner, pl.Utf8):
continue
pd_df[col] = pd_df[col].apply(lambda x: x.tolist())

# ic(pd_df)
pd_df.to_sql(
schema=schema_name,
name=table_name,
con=connection,
Expand Down

0 comments on commit 2459171

Please sign in to comment.