Skip to content

Commit

Permalink
Delete user #352
Browse files Browse the repository at this point in the history
  • Loading branch information
alanwilter committed May 26, 2021
1 parent f0d04fa commit ca06f7b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 17 deletions.
42 changes: 33 additions & 9 deletions tests/test_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ def test_get_user(_admin):
assert status == 200
user_dict = response.json
assert isinstance(user_dict, dict)
assert user_dict.get("user") == "Admin", "user_dict={}".format(user_dict)
assert user_dict.get("argon_password") is None, "user_dict={}".format(user_dict)
assert user_dict.get("user") == "Admin", f"user_dict={user_dict}"
assert user_dict.get("argon_password") is None, f"user_dict={user_dict}"
individual_ids = user_dict.get("individuals")
assert isinstance(individual_ids, list), "user_dict={}".format(user_dict)
assert len(individual_ids) > 0, "user_dict={}".format(user_dict)
assert isinstance(individual_ids, list), f"user_dict={user_dict}"
assert len(individual_ids) > 0, f"user_dict={user_dict}"


def test_get_non_existing_user(_admin):
Expand All @@ -71,8 +71,8 @@ def test_get_users(_admin):
response, status = get_users()
assert status == 200
users = response.json
assert isinstance(users, list), "users={}".format(users)
assert len(users) >= 2, "users={}".format(users)
assert isinstance(users, list), f"users={users}"
assert len(users) >= 2, f"users={users}"
assert "Admin" in users
assert "demo" in users

Expand Down Expand Up @@ -147,7 +147,7 @@ def test_create_and_confirm_user(_not_logged_in_client):
_assert_create_user(db_session, _not_logged_in_client, user)
# confirms the user
confirmation_token = generate_confirmation_token(user.email)
response = _not_logged_in_client.get("/user/confirm/{}".format(confirmation_token))
response = _not_logged_in_client.get(f"/user/confirm/{confirmation_token}")
assert response.status_code == 200
observed_user = db_session.query(User).filter(User.user == user.user).first()
assert observed_user.user == user.user
Expand All @@ -162,7 +162,7 @@ def test_create_and_confirm_user(_not_logged_in_client):
def test_confirm_user_with_token_with_unexisting_email(_not_logged_in_client):
# tries to confirm an email not in the database
confirmation_token = generate_confirmation_token("nottherightemail@phenopolis.org")
response = _not_logged_in_client.get("/user/confirm/{}".format(confirmation_token))
response = _not_logged_in_client.get(f"/user/confirm/{confirmation_token}")
assert response.status_code == 404


Expand All @@ -174,7 +174,7 @@ def test_confirm_user_with_bad_token(_not_logged_in_client):
def test_confirm_user_already_confirmed(_not_logged_in_client):
# tries to confirm an email not in the database
confirmation_token = generate_confirmation_token("demo@phenopolis.org")
response = _not_logged_in_client.get("/user/confirm/{}".format(confirmation_token))
response = _not_logged_in_client.get(f"/user/confirm/{confirmation_token}")
assert response.status_code == 200


Expand Down Expand Up @@ -301,6 +301,30 @@ def test_change_password(_nondemo_client):
assert argon2.verify(old_password, observed_user.argon_password)


def test_delete_user(_admin_client):
user_name = "test_register6"
with session_scope() as db_session:
user = User()
user.user = user_name
user.argon_password = "blabla"
user.email = "test_register6@phenopolis.org"
user.enabled = True
user.confirmed = True
_assert_create_user(db_session, _admin_client, user)

# deletes user
response = _admin_client.delete(f"/user/{user_name}", content_type="application/json")
assert response.status_code == 200

# confirms it does not exist
o_user = db_session.query(User).filter(User.user == user_name).first()
assert o_user is None, "Deletion was not successful"

# try to delete non-existent user
response = _admin_client.delete("/user/not_me", content_type="application/json")
assert response.status_code == 404


def _assert_create_user(db_session: Session, _client, user):
payload = user.as_dict()
payload["confirmation_url"] = "http://phenopolis.org/confirm/"
Expand Down
44 changes: 36 additions & 8 deletions views/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ def enable_user(user_id, status):
if user_id == ADMIN_USER:
raise PhenopolisException("Cannot change the status of Admin user!", 400)
user = _get_user_by_id(db_session, user_id)
if not user:
return jsonify(message="User not found"), 404
user.enabled = _parse_boolean_parameter(status)
enabled_flag = user.enabled
except PhenopolisException as e:
return jsonify(success=False, message=str(e)), e.http_status
return jsonify(success=True, message="User enabled flag set to {}".format(enabled_flag)), 200
return jsonify(success=True, message=f"User enabled flag set to {enabled_flag}"), 200


@application.route("/user", methods=["POST"])
Expand Down Expand Up @@ -106,6 +108,8 @@ def get_user(user_id):
try:
with session_scope() as db_session:
user = _get_user_by_id(db_session, user_id)
if not user:
return jsonify(message="User not found"), 404
user_individuals = db_session.query(UserIndividual).filter(UserIndividual.user == user.user).all()
user_dict = user.as_dict()
# removes the password hash from the endpoint we don't want/need this around
Expand Down Expand Up @@ -147,6 +151,34 @@ def confirm_user(token):
return response


@application.route("/user/<user_id>", methods=["DELETE"])
@requires_auth
def delete_user(user_id):
with session_scope() as db_session:
user = _get_user_by_id(db_session, user_id)
request_ok = True
http_status = 200
message = f"User {user_id} has been deleted."
if user:
try:
# user_individuals = db_session.query(UserIndividual).filter(UserIndividual.user == user_id).all()
# for ui in user_individuals:
# db_session.delete(ui)
db_session.query(UserIndividual).filter(UserIndividual.user == user_id).delete()
db_session.query(UserConfig).filter(UserConfig.user_name == user_id).delete()
db_session.delete(user)
except Exception as e:
application.logger.exception(e)
request_ok = False
message = str(e)
http_status = e.http_status
else:
request_ok = False
message = f"User {user_id} does not exist."
http_status = 404
return jsonify(success=request_ok, message=message), http_status


def _check_user_valid(new_user: User):
if new_user.user is None or new_user.user == "":
raise PhenopolisException("Missing user name", 400)
Expand All @@ -167,16 +199,12 @@ def _add_config_from_admin(db_session, new_user):


def _get_user_by_id(db_session, user_id: str) -> User:
users = db_session.query(User).filter(User.user == user_id).all()
if len(users) == 0:
raise PhenopolisException(message="The user does not exist", http_status=404)
return users[0]
return db_session.query(User).filter(User.user == user_id).first()


def _send_confirmation_email(user: User, confirmation_url: str):
confirmation_token = generate_confirmation_token(user.email)
m = Message("Confirm your registration into Phenopolis", sender=MAIL_USERNAME, recipients=[user.email],)
m.body = "Welcome to Phenopolis {user}, confirm your registration in the following link {url_base}/{token}".format(
user=user.user, url_base=confirmation_url, token=confirmation_token
)
m.body = f"""Welcome to Phenopolis {user.user}, confirm your registration in the following link:\n
{confirmation_url}/{confirmation_token}"""
mail.send(m)

0 comments on commit ca06f7b

Please sign in to comment.