Skip to content

Commit

Permalink
feat: Change config file schema
Browse files Browse the repository at this point in the history
  • Loading branch information
HectorxH committed Sep 26, 2024
1 parent 16bb5fe commit 70f283f
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 138 deletions.
29 changes: 29 additions & 0 deletions mapping.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#:schema ./mapping_schema.json
[[tables]]
from = "object_parquet"
to = "object"
[[tables.columns]]
from = "oid_parquet"
to = "oid"
[[tables.columns]]
from = "firstmjd_parquet"
to = "firstmjd"
[[tables.columns]]
from = "ndet_parquet"
to = "ndet"

[[tables]]
from = "detections_parquet"
to = "detections"
[[tables.columns]]
from = "candid_parquet"
to = "candid"
[[tables.columns]]
from = "oid_parquet"
to = "oid"
[[tables.columns]]
from = "mag_parquet"
to = "mag"
[[tables.columns]]
from = "other_parquet"
to = "blahblah"
21 changes: 21 additions & 0 deletions mapping.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
tables:
- from: object_parquet
to: object_parquet
columns:
- from: oid_parquet
to: oid
- from: firstmjd_parquet
to: firstmjd
- from: ndet_parquet
to: ndet
- from: detections_parquet
to: detections
columns:
- from: candid_parquet
to: candid
- from: oid_parquet
to: oid
- from: mag_parquet
to: mag
- from: "other_parquet"
to: "blahblah"
33 changes: 22 additions & 11 deletions src/mdbl/cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from typing import BinaryIO
import json
from typing import BinaryIO, TextIO

import click
import mdbl.mdbl as mdbl
import duckdb

import mdbl.mdbl as mdbl
from mdbl.models.cli import ValidFileTypes
from mdbl.models.mappings import DBMappings


@click.group
def main():
Expand All @@ -13,19 +18,26 @@ def main():


@main.command()
def hello_world():
@click.option("-f", "--file", type=click.File("w"))
@click.option("-i", "--indent", type=int, default=4)
def schema(file: TextIO | None, indent: int):
"""
Prints 'Hello, World!' to the console.
Generates a JSON schema describing the format of the mapping file
to give better IDE support.
"""
click.echo("Hello, World!")
json_shema = DBMappings.model_json_schema()
if file:
file.write(json.dumps(json_shema))
else:
click.echo(json.dumps(json_shema, indent=indent))


@main.command()
def sql():
with duckdb.connect() as con:
con.install_extension("postgres")
con.load_extension("postgres")
_ = con.sql(
con.sql(
"ATTACH 'dbname=postgres user=postgres password=postgres host=127.0.0.1' as postgres (TYPE POSTGRES)"
)
while True:
Expand All @@ -40,14 +52,13 @@ def sql():


@main.command()
@click.option("--file", "-f", required=True, type=click.File("rb"))
@click.option("-f", "--file", required=True, type=click.File("rb"))
@click.option(
"--file-type",
"-t",
"--file_type",
required=True,
type=click.Choice(mdbl.ValidFileTypes.possible_values(), case_sensitive=False),
type=click.Choice(ValidFileTypes.possible_values(), case_sensitive=False),
)
def data_load(file: BinaryIO, file_type: str):
db_mappings = mdbl.read_mapping(file, mdbl.ValidFileTypes(file_type))
click.echo(db_mappings)
db_mappings = mdbl.read_mapping(file, ValidFileTypes(file_type))
mdbl.data_load(db_mappings)
154 changes: 27 additions & 127 deletions src/mdbl/mdbl.py
Original file line number Diff line number Diff line change
@@ -1,131 +1,41 @@
from enum import Enum
import math
from typing import BinaryIO, override
from faker import Faker
import polars as pl
from typing import BinaryIO

import duckdb
from pathlib import Path
from pydantic import BaseModel, RootModel
import tomllib
import yaml

fake = Faker()


class Mapping(BaseModel):
column: str


class TableMappings(BaseModel):
step: int
parquet: str
mapping: dict[str, Mapping]


class DBMappings(RootModel[dict[str, TableMappings]]):
@override
def __iter__(self): # pyright: ignore[reportIncompatibleMethodOverride]
return iter(self.root)

def __getitem__(self, item: str):
return self.root[item]


db_mappings = DBMappings.model_validate(
{
"object": {
"step": 0,
"parquet": "object_parquet",
"mapping": {
"oid_parquet": {"column": "oid"},
"firstmjd_parquet": {"column": "firstmjd"},
"ndet_parquet": {"column": "ndet"},
},
},
"detections": {
"step": 1,
"parquet": "detections_parquet",
"mapping": {
"candid_parquet": {"column": "candid"},
"oid_parquet": {"column": "oid"},
"other_parquet": {"column": "mag"},
},
},
}
)


parquets = {
"object_parquet": [
"oid_parquet",
"firstmjd_parquet",
"lastmjd_parquet",
"ndet_parquet",
],
"detections_parquet": [
"candid_parquet",
"oid_parquet",
"mag_parquet",
"other_parquet",
"another_parquet",
],
}


class ValidFileTypes(Enum):
TOML = "TOML"
YAML = "YAML"

@classmethod
def possible_values(cls):
return [variant.value for variant in cls.__members__.values()]
from mdbl.models.cli import ValidFileTypes
from mdbl.models.mappings import DBMappings
from mdbl.utils import generate_dummy_parquets


def read_mapping(file: BinaryIO, file_type: ValidFileTypes) -> DBMappings:
match file_type:
case ValidFileTypes.TOML:
data = tomllib.load(file)
return DBMappings(data)
return DBMappings.model_validate(data)
case ValidFileTypes.YAML:
data = yaml.safe_load(file)
return DBMappings(data)
return DBMappings.model_validate(data)


def generate_dummy_parquets(
folder: str = "parquets",
n_parquets: int = 30,
n_rows_per_parquet: int = 1000,
):
for table, columns in parquets.items():
table_folder = Path(folder) / table
table_folder.mkdir(parents=True, exist_ok=True)
for i in range(1, n_parquets + 1):
dummy_data = {
column: [f"{column}: {fake.md5()}" for _ in range(n_rows_per_parquet)]
for column in columns
}
df = pl.DataFrame(dummy_data)
df.write_parquet(
f"{folder}/{table}/{str(i).zfill(math.ceil(math.log10(n_parquets)))}.parquet"
)


def data_load(db_mappings: DBMappings = db_mappings, folder: str = "parquets"):
def data_load(db_mappings: DBMappings, folder: str = "parquets"):
"""
Exmaple:
```python
mapping = {
"object": {
"step": 0,
"parquet": "object_parquet",
"mapping": {
"oid_parquet": {"column": "oid"},
"firstmjd_parquet": {"column": "firstmjd"},
"ndet_parquet": {"column": "ndet"},
},
}
}
```toml
[[table]]
source = "object_parquet"
to = "object"
[[table.column]]
source = "oid_parquet"
to = "oid"
[[table.column]]
source = "firstmjd_parquet"
to = "firstmjd"
[[table.column]]
source = "ndet_parquet"
to = "ndet"
```
```sql
Expand All @@ -143,26 +53,16 @@ def data_load(db_mappings: DBMappings = db_mappings, folder: str = "parquets"):
with duckdb.connect() as con:
con.install_extension("postgres")
con.load_extension("postgres")
_ = con.sql(
con.sql(
"ATTACH 'dbname=postgres user=postgres password=postgres host=127.0.0.1' as postgres (TYPE POSTGRES)"
)

sorted_tables = sorted(db_mappings, key=lambda table: db_mappings[table].step)
for table in sorted_tables:
parquet = db_mappings[table].parquet
mapping = db_mappings[table].mapping

for table in db_mappings.tables:
aliases = ", ".join(
[
f"{parquet_col} as {col_data.column}"
for parquet_col, col_data in mapping.items()
]
)
query = (
f"SELECT {aliases} FROM read_parquet('{folder}/{parquet}/*.parquet')"
[f"{column.from_} as {column.to}" for column in table.columns]
)
print(query)
query = f"SELECT {aliases} FROM read_parquet('{folder}/{table.from_}/*.parquet')"
con.sql(query).show()

_ = con.sql(f"CREATE OR REPLACE TABLE postgres.{table} AS {query}")
_ = con.sql(f"SELECT * FROM postgres.{table}")
con.sql(f"CREATE OR REPLACE TABLE postgres.{table.to} AS {query}")
con.sql(f"SELECT * FROM postgres.{table.to}")
10 changes: 10 additions & 0 deletions src/mdbl/models/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from enum import Enum


class ValidFileTypes(Enum):
TOML = "TOML"
YAML = "YAML"

@classmethod
def possible_values(cls):
return [variant.value for variant in cls.__members__.values()]
38 changes: 38 additions & 0 deletions src/mdbl/models/mappings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from pydantic import BaseModel, Field

# Using `from_` because `from` is a reserved keyword


class ColumnMapping(BaseModel):
from_: str = Field(alias="from")
to: str = Field()


class TableMappings(BaseModel):
from_: str = Field(alias="from")
to: str = Field()
columns: list[ColumnMapping] = []


class DBMappings(BaseModel):
tables: list[TableMappings] = []


a = {
"tables": [
{
"from": "parquet_obj",
"to": "obj",
"columns": [{"from": "col_tabla", "to": "col_db"}],
},
{
"from": "parquet_det",
"to": "det",
"columns": [
{"from": "col_tabla", "to": "col_db"},
{"from": "col_tabla", "to": "col_db"},
{"from": "col_tabla", "to": "col_db"},
],
},
]
}
42 changes: 42 additions & 0 deletions src/mdbl/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import math
from pathlib import Path

import polars as pl
from faker import Faker

fake = Faker()

parquets = {
"object_parquet": [
"oid_parquet",
"firstmjd_parquet",
"lastmjd_parquet",
"ndet_parquet",
],
"detections_parquet": [
"candid_parquet",
"oid_parquet",
"mag_parquet",
"other_parquet",
"another_parquet",
],
}


def generate_dummy_parquets(
folder: str = "parquets",
n_parquets: int = 30,
n_rows_per_parquet: int = 1000,
) -> None:
for table, columns in parquets.items():
table_folder = Path(folder) / table
table_folder.mkdir(parents=True, exist_ok=True)
for i in range(1, n_parquets + 1):
dummy_data = {
column: [f"{column}: {fake.md5()}" for _ in range(n_rows_per_parquet)]
for column in columns
}
df = pl.DataFrame(dummy_data)
df.write_parquet(
f"{folder}/{table}/{str(i).zfill(math.ceil(math.log10(n_parquets)))}.parquet"
)

0 comments on commit 70f283f

Please sign in to comment.