Skip to content

Commit

Permalink
⚡️ close opened sql connections
Browse files Browse the repository at this point in the history
  • Loading branch information
PierrickBrun committed Jun 4, 2024
1 parent d23ff44 commit 7c586bc
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 53 deletions.
17 changes: 8 additions & 9 deletions src/plombery/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,16 @@ class PipelineRun(Base):


def _mark_cancelled_runs():
db = SessionLocal()

db.query(PipelineRun).filter(
PipelineRun.status == PipelineRunStatus.RUNNING.value
).update(
dict(
status=PipelineRunStatus.CANCELLED.value,
with SessionLocal() as db:
db.query(PipelineRun).filter(
PipelineRun.status == PipelineRunStatus.RUNNING.value
).update(
dict(
status=PipelineRunStatus.CANCELLED.value,
)
)
)

db.commit()
db.commit()


_mark_cancelled_runs()
85 changes: 41 additions & 44 deletions src/plombery/database/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,82 +9,79 @@


def create_pipeline_run(data: PipelineRunCreate):
db = SessionLocal()
db.expire_on_commit = False
with SessionLocal() as db:
db.expire_on_commit = False

created_model = models.PipelineRun(**data.model_dump())
db.add(created_model)
db.commit()
db.refresh(created_model)
created_model = models.PipelineRun(**data.model_dump())
db.add(created_model)
db.commit()
db.refresh(created_model)
return created_model


def update_pipeline_run(
pipeline_run: models.PipelineRun, end_time: datetime, status: PipelineRunStatus
):
db = SessionLocal()

pipeline_run.duration = (end_time - pipeline_run.start_time).total_seconds() * 1000
pipeline_run.status = status.value

db.query(models.PipelineRun).filter(
models.PipelineRun.id == pipeline_run.id
).update(
dict(
duration=pipeline_run.duration,
status=pipeline_run.status,
tasks_run=pipeline_run.tasks_run,
with SessionLocal() as db:

db.query(models.PipelineRun).filter(
models.PipelineRun.id == pipeline_run.id
).update(
dict(
duration=pipeline_run.duration,
status=pipeline_run.status,
tasks_run=pipeline_run.tasks_run,
)
)
)

db.commit()
db.commit()


def list_pipeline_runs(
pipeline_id: Optional[str] = None, trigger_id: Optional[str] = None
):
db = SessionLocal()
db.expire_on_commit = False

filters = []

if pipeline_id:
filters.append(models.PipelineRun.pipeline_id == pipeline_id)
if trigger_id:
filters.append(models.PipelineRun.trigger_id == trigger_id)

pipeline_runs: List[models.PipelineRun] = (
db.query(models.PipelineRun)
.filter(*filters)
.order_by(models.PipelineRun.id.desc())
.limit(30)
.all()
)
with SessionLocal() as db:
db.expire_on_commit = False

pipeline_runs: List[models.PipelineRun] = (
db.query(models.PipelineRun)
.filter(*filters)
.order_by(models.PipelineRun.id.desc())
.limit(30)
.all()
)

return pipeline_runs


def get_pipeline_run(pipeline_run_id: int) -> Optional[models.PipelineRun]:
db = SessionLocal()
with SessionLocal() as db:

pipeline_run: Optional[models.PipelineRun] = db.query(models.PipelineRun).get(
pipeline_run_id
)
pipeline_run: Optional[models.PipelineRun] = db.query(models.PipelineRun).get(
pipeline_run_id
)

return pipeline_run


def get_latest_pipeline_run(pipeline_id, trigger_id):
db = SessionLocal()

pipeline_run: models.PipelineRun = (
db.query(models.PipelineRun)
.filter(
models.PipelineRun.pipeline_id == pipeline_id,
models.PipelineRun.trigger_id == trigger_id,
with SessionLocal() as db:

pipeline_run: models.PipelineRun = (
db.query(models.PipelineRun)
.filter(
models.PipelineRun.pipeline_id == pipeline_id,
models.PipelineRun.trigger_id == trigger_id,
)
.order_by(models.PipelineRun.id.desc())
.first()
)
.order_by(models.PipelineRun.id.desc())
.first()
)

return pipeline_run

0 comments on commit 7c586bc

Please sign in to comment.