Skip to content

Commit

Permalink
Fix the ability to create identical events
Browse files Browse the repository at this point in the history
  • Loading branch information
DaymasS committed Sep 15, 2024
1 parent 5b553c0 commit 8355775
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 24 deletions.
69 changes: 45 additions & 24 deletions calendar_backend/routes/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ async def _get_timetable(start: date, end: date, group_id, lecturer_id, room_id,
return GetListEvent(items=events, limit=limit, offset=offset, total=cnt).model_dump(exclude=fmt)


def _is_unique_event(event_dict):
existing_events_query = (
Event.get_all(session=db.session)
.filter(Event.name == event_dict["name"])
.filter(Event.start_ts == event_dict["start_ts"])
.filter(Event.end_ts == event_dict["end_ts"])
)
for existing_event in existing_events_query.all():
if (
[column.id for column in existing_event.group] == event_dict["group_id"]
and [column.id for column in existing_event.room] == event_dict["room_id"]
and [column.id for column in existing_event.lecturer] == event_dict["lecturer_id"]
):
return False
return True


@router.get("/", response_model=GetListEvent | None)
async def get_events(
start: date | None = Query(default=None, description="Default: Today"),
Expand All @@ -84,21 +101,22 @@ async def get_events(
return await fmt_cases[format]()


@router.post("/", response_model=EventGet)
@router.post("/", response_model=EventGet | None)
async def create_event(event: EventPost, _=Depends(UnionAuth(scopes=["timetable.event.create"]))) -> EventGet:
event_dict = event.model_dump()
rooms = [Room.get(room_id, session=db.session) for room_id in event_dict.pop("room_id", [])]
lecturers = [Lecturer.get(lecturer_id, session=db.session) for lecturer_id in event_dict.pop("lecturer_id", [])]
groups = [Group.get(group_id, session=db.session) for group_id in event_dict.pop("group_id", [])]
event_get = Event.create(
**event_dict,
room=rooms,
lecturer=lecturers,
group=groups,
session=db.session,
)
db.session.commit()
return EventGet.model_validate(event_get)
if _is_unique_event(event_dict):
rooms = [Room.get(room_id, session=db.session) for room_id in event_dict.pop("room_id", [])]
lecturers = [Lecturer.get(lecturer_id, session=db.session) for lecturer_id in event_dict.pop("lecturer_id", [])]
groups = [Group.get(group_id, session=db.session) for group_id in event_dict.pop("group_id", [])]
event_get = Event.create(
**event_dict,
room=rooms,
lecturer=lecturers,
group=groups,
session=db.session,
)
db.session.commit()
return EventGet.model_validate(event_get)


@router.post("/bulk", response_model=list[EventGet])
Expand All @@ -108,18 +126,21 @@ async def create_events(
result = []
for event in events:
event_dict = event.model_dump()
rooms = [Room.get(room_id, session=db.session) for room_id in event_dict.pop("room_id", [])]
lecturers = [Lecturer.get(lecturer_id, session=db.session) for lecturer_id in event_dict.pop("lecturer_id", [])]
groups = [Group.get(group_id, session=db.session) for group_id in event_dict.pop("group_id", [])]
result.append(
Event.create(
**event_dict,
room=rooms,
lecturer=lecturers,
group=groups,
session=db.session,
if _is_unique_event(event_dict):
rooms = [Room.get(room_id, session=db.session) for room_id in event_dict.pop("room_id", [])]
lecturers = [
Lecturer.get(lecturer_id, session=db.session) for lecturer_id in event_dict.pop("lecturer_id", [])
]
groups = [Group.get(group_id, session=db.session) for group_id in event_dict.pop("group_id", [])]
result.append(
Event.create(
**event_dict,
room=rooms,
lecturer=lecturers,
group=groups,
session=db.session,
)
)
)
db.session.commit()
adapter = TypeAdapter(list[EventGet])
return adapter.validate_python(result)
Expand Down
69 changes: 69 additions & 0 deletions tests/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,75 @@ def test_create_many(client_auth: TestClient, dbsession: Session, room_factory,
assert [row.id for row in response_model.group] == request_obj[1]["group_id"]


def test_create_clone(client_auth: TestClient, dbsession: Session, room_path, group_path, lecturer_path):
room_id = int(room_path.split("/")[-1])
group_id = int(group_path.split("/")[-1])
lecturer_id = int(lecturer_path.split("/")[-1])
time_stamp = datetime.datetime.now()
name = f"name_{time_stamp}"
request_obj = {
"name": name,
"room_id": [room_id],
"group_id": [group_id],
"lecturer_id": [lecturer_id],
"start_ts": "2022-08-26T22:32:38.575Z",
"end_ts": "2022-08-26T22:32:38.575Z",
}
response = client_auth.post(RESOURCE, json=request_obj)
assert response.status_code == status.HTTP_200_OK, response.json()
response = client_auth.post(RESOURCE, json=request_obj)
assert response.status_code == status.HTTP_200_OK
assert response.json() is None
events = dbsession.query(Event).filter(Event.name == name).all()
assert len(events) == 1


def test_create_many_clones(client_auth: TestClient, dbsession: Session, room_factory, group_factory, lecturer_factory):
time_stamp = datetime.datetime.now()
name1 = f"name1_{time_stamp}"
room_path1 = room_factory(client_auth)
group_path1 = group_factory(client_auth)
lecturer_path1 = lecturer_factory(client_auth)
name2 = f"name2_{time_stamp}"
room_path2 = room_factory(client_auth)
group_path2 = group_factory(client_auth)
lecturer_path2 = lecturer_factory(client_auth)
room_id1 = int(room_path1.split("/")[-1])
group_id1 = int(group_path1.split("/")[-1])
lecturer_id1 = int(lecturer_path1.split("/")[-1])
room_id2 = int(room_path2.split("/")[-1])
group_id2 = int(group_path2.split("/")[-1])
lecturer_id2 = int(lecturer_path2.split("/")[-1])
request_obj = [
{
"name": name1,
"room_id": [room_id1],
"group_id": [group_id1],
"lecturer_id": [lecturer_id1],
"start_ts": "2022-08-26T22:32:38.575Z",
"end_ts": "2022-08-26T22:32:38.575Z",
},
{
"name": name2,
"room_id": [room_id2],
"group_id": [group_id2],
"lecturer_id": [lecturer_id2],
"start_ts": "2022-08-26T22:32:38.575Z",
"end_ts": "2022-08-26T22:32:38.575Z",
},
]
response = client_auth.post(f"{RESOURCE}bulk", json=request_obj)
assert response.status_code == status.HTTP_200_OK, response.json()
assert response.json()[0]["name"] == request_obj[0]["name"]
response = client_auth.post(f"{RESOURCE}bulk", json=request_obj)
assert response.status_code == status.HTTP_200_OK
assert len(response.json()) == 0
events = dbsession.query(Event).filter(Event.name == name1).all()
assert len(events) == 1
events = dbsession.query(Event).filter(Event.name == name2).all()
assert len(events) == 1


def test_delete(client_auth: TestClient, dbsession: Session, room_path, lecturer_path, group_path):
room_id = int(room_path.split("/")[-1])
group_id = int(group_path.split("/")[-1])
Expand Down

0 comments on commit 8355775

Please sign in to comment.