-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgroup.py
67 lines (53 loc) · 2.31 KB
/
group.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import logging
from auth_lib.fastapi import UnionAuth
from fastapi import APIRouter, Depends, HTTPException
from fastapi_sqlalchemy import db
from calendar_backend.models import Group
from calendar_backend.routes.models import GetListGroup, GroupGet, GroupPatch, GroupPost
from calendar_backend.settings import get_settings
settings = get_settings()
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/group", tags=["Group"])
@router.get("/{id}", response_model=GroupGet)
async def get_group_by_id(id: int) -> GroupGet:
return GroupGet.model_validate(Group.get(id, session=db.session))
@router.get("/", response_model=GetListGroup)
async def get_groups(query: str = "", limit: int = 10, offset: int = 0) -> GetListGroup:
res = Group.get_all(session=db.session).filter(Group.number.contains(query))
if limit:
cnt, res = res.count(), res.offset(offset).limit(limit).all()
else:
cnt, res = res.count(), res.offset(offset).all()
return GetListGroup(
**{
"items": res,
"limit": limit,
"offset": offset,
"total": cnt,
}
)
@router.post("/", response_model=GroupGet)
async def create_group(group: GroupPost, _=Depends(UnionAuth(scopes=["timetable.group.create"]))) -> GroupGet:
if db.session.query(Group).filter(Group.number == group.number).one_or_none():
raise HTTPException(status_code=423, detail="Already exists")
group = Group.create(**group.model_dump(), session=db.session)
db.session.commit()
return GroupGet.model_validate(group)
@router.patch("/{id}", response_model=GroupGet)
async def patch_group(
id: int,
group_inp: GroupPatch,
_=Depends(UnionAuth(scopes=["timetable.group.update"])),
) -> GroupGet:
if (
bool(query := Group.get_all(session=db.session).filter(Group.number == group_inp.number).one_or_none())
and query.id != id
):
raise HTTPException(status_code=423, detail="Already exists")
patched = Group.update(id, **group_inp.model_dump(exclude_unset=True), session=db.session)
db.session.commit()
return GroupGet.model_validate(patched)
@router.delete("/{id}", response_model=None)
async def delete_group(id: int, _=Depends(UnionAuth(scopes=["timetable.group.delete"]))) -> None:
Group.delete(id, session=db.session)
db.session.commit()