Skip to content

Commit

Permalink
feat: migrate old AD user from Tunnistamo to Keycloak
Browse files Browse the repository at this point in the history
Migration happens one user at a time upon login.

Feature can be configured using the following settings.
- `HELUSERS_AD_MIGRATE_ENABLED` enable the feature.
  Defaults to `False`.
- `HELUSERS_AD_MIGRATE_EMAIL_DOMAINS` whitelisted email domains
  for migration. Defaults to `["@hel.fi"]`.
- `HELUSERS_AD_MIGRATE_AMR` which authentication methods are
  used for migration. Defaults to `["helsinkiad"]`.

Migration logic is only run on certain conditions:
- Correct authentication method is used (AMR-claim)
- Email domain is correct
- User with the new UUID doesn't exist yet
- Old user is found by first name, last name and email
- Old user has username generated by helusers.utils.uuid_to_username

Instead of allowing a new user to be created the migration is done by
replacing the old user UUID with the one from the incoming token
payload. Logic which is run later should take care of updating other
user related fields.

Primary key is separate from the user UUID, so the user UUID can be
changed. This migration should therefore retain all the data related
to the user.

Migration logic only supports authentication methods from this package
and Python Social Auth pipeline helusers.defaults.SOCIAL_AUTH_PIPELINE.
This doesn't support migrating users which are using e.g. a different
pipeline for Python Social Auth (e.g. the default pipeline).

Refs: HP-2429
  • Loading branch information
charn committed Jun 19, 2024
1 parent 454cf51 commit 61e9838
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 0 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,28 @@ If you're not allowing users to log in with passwords, you may disable the
username/password form from Django admin login page by setting `HELUSERS_PASSWORD_LOGIN_DISABLED`
to `True`.


#### Migrating old AD user from Tunnistamo to Keycloak

When transitioning from one AD authentication to another, it is possible to migrate
the old user data for the new user with a different UUID. Migration is done by finding
the old user instance and replacing its UUID with the new one from the token
payload. Migration happens one user at a time upon login.

Feature can be configured using the following settings.
- `HELUSERS_AD_MIGRATE_ENABLED`: Enable the feature. Defaults to `False`.
- `HELUSERS_AD_MIGRATE_EMAIL_DOMAINS`: Whitelisted email domains for migration.
Defaults to `["@hel.fi"]`.
- `HELUSERS_AD_MIGRATE_AMR` which authentication methods are used for migration.
Defaults to `["helsinkiad"]`.

Migration logic is only run on certain conditions:
- Correct authentication method is used (AMR-claim)
- Email domain is correct
- User with the new UUID doesn't exist yet
- Old user is found by first name, last name and email
- Username has been generated by helusers.utils.uuid_to_username for the old user

# Development

Virtual Python environment can be used. For example:
Expand Down
112 changes: 112 additions & 0 deletions helusers/tests/test_migrate_ad_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import uuid

import pytest
from django.contrib.auth import get_user_model

from helusers.user_utils import get_or_create_user, migrate_ad_user
from helusers.utils import uuid_to_username


@pytest.fixture(autouse=True)
def setup_migrate(settings):
settings.HELUSERS_AD_MIGRATE_ENABLED = True
settings.HELUSERS_AD_MIGRATE_AMR = ["a", "b"]
settings.HELUSERS_AD_MIGRATE_EMAIL_DOMAINS = ["@example.com", "@example.org"]


@pytest.mark.parametrize(
"migrate_enabled, amr, email, good_username, expect_migration",
[
pytest.param(True, ["a"], "auser@example.org", True, True, id="migrate"),
pytest.param(
True, ["a"], "auser@example.org", False, False, id="wrong_username"
),
pytest.param(True, ["c"], "auser@example.org", True, False, id="wrong_amr"),
pytest.param(True, ["a"], "auser@example.net", True, False, id="wrong_domain"),
pytest.param(False, ["a"], "auser@example.org", True, False, id="disabled"),
],
)
@pytest.mark.django_db
def test_migrate_ad_user(
settings, migrate_enabled, amr, email, good_username, expect_migration
):
settings.HELUSERS_AD_MIGRATE_ENABLED = migrate_enabled
old_uuid = uuid.uuid4()
new_uuid = uuid.uuid4()
old_username = uuid_to_username(old_uuid) if good_username else str(old_uuid)
user_model = get_user_model()
user = user_model.objects.create(
uuid=old_uuid,
username=old_username,
first_name="A",
last_name="User",
email=email,
)

payload = {
"sub": str(new_uuid),
"amr": amr,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
}

migrate_ad_user(user_id=str(new_uuid), payload=payload)

if expect_migration:
user.refresh_from_db()
assert user.uuid == new_uuid
assert user.username == uuid_to_username(new_uuid)
else:
user.refresh_from_db()
assert user.uuid == old_uuid
assert user.username == old_username


@pytest.mark.parametrize(
"migrate_enabled, amr, email, good_username, expect_migration",
[
pytest.param(True, ["a"], "auser@example.org", True, True, id="migrate"),
pytest.param(
True, ["a"], "auser@example.org", False, False, id="wrong_username"
),
pytest.param(True, ["c"], "auser@example.org", True, False, id="wrong_amr"),
pytest.param(True, ["a"], "auser@example.net", True, False, id="wrong_domain"),
pytest.param(False, ["a"], "auser@example.org", True, False, id="disabled"),
],
)
@pytest.mark.django_db
def test_get_or_create_user_migrate_ad_user(
settings, migrate_enabled, amr, email, good_username, expect_migration
):
settings.HELUSERS_AD_MIGRATE_ENABLED = migrate_enabled
old_uuid = uuid.uuid4()
new_uuid = uuid.uuid4()
user_model = get_user_model()
old_user = user_model.objects.create(
uuid=old_uuid,
username=uuid_to_username(old_uuid) if good_username else str(old_uuid),
first_name="A",
last_name="User",
email=email,
)

payload = {
"sub": str(new_uuid),
"amr": amr,
"email": old_user.email,
"first_name": old_user.first_name,
"last_name": old_user.last_name,
}

user = get_or_create_user(payload)

if expect_migration:
assert user_model.objects.count() == 1
assert user.uuid == new_uuid
assert user.username == uuid_to_username(new_uuid)
else:
assert user_model.objects.count() == 2
assert user_model.objects.filter(uuid=old_uuid).exists()
assert user.uuid == new_uuid
assert user.username == uuid_to_username(new_uuid)
77 changes: 77 additions & 0 deletions helusers/user_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from django.db import IntegrityError, transaction
from django.utils.translation import gettext as _

from helusers.utils import uuid_to_username

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -111,7 +113,80 @@ def convert_to_uuid(convertable, namespace=None):
return generated_uuid


def migrate_ad_user(user_id: str, payload: dict, oidc: bool = False):
"""Migrate old AD user from Tunnistamo to Keycloak.
Migration logic is only run on certain conditions:
- Correct authentication method is used (AMR-claim)
- Email domain is correct
- User with the new UUID doesn't exist yet
- Old user is found by first name, last name and email
- Old user has username generated by helusers.utils.uuid_to_username
Instead of allowing a new user to be created the migration is done by
replacing the old user UUID with the one from the incoming token
payload. Logic which is run later should take care of updating other
user related fields.
Primary key is separate from the user UUID, so the user UUID can be
changed. This migration should therefore retain all the data related
to the user.
Migration logic only supports authentication methods from this package
and Python Social Auth pipeline helusers.defaults.SOCIAL_AUTH_PIPELINE.
This doesn't support migrating users which are using e.g. a different
pipeline for Python Social Auth (e.g. the default pipeline).
"""
amr_to_migrate = getattr(settings, "HELUSERS_AD_MIGRATE_AMR", ["helsinkiad"])
domains_to_migrate = getattr(
settings, "HELUSERS_AD_MIGRATE_EMAIL_DOMAINS", ["@hel.fi"]
)

if not getattr(settings, "HELUSERS_AD_MIGRATE_ENABLED", False):
return

# If not using the expected authentication method, don't migrate
if not (
(amr := payload.get("amr"))
and isinstance(amr, list)
and any(value in amr_to_migrate for value in amr)
):
return

if oidc:
payload = oidc_to_user_data(payload)

uid = UUID(user_id)
email = payload.get("email")
first_name = payload.get("first_name")
last_name = payload.get("last_name")

if not email or not any([email.endswith(domain) for domain in domains_to_migrate]):
return

user_model = get_user_model()

if user_model.objects.filter(uuid=uid).exists():
logger.debug(f"User {user_id} exist, not trying to migrate.")
return

user = user_model.objects.filter(
first_name=first_name,
last_name=last_name,
email=email,
username__startswith="u-",
).first()

if user:
logger.info(f"Migrating user {user.uuid} to {user_id}.")

user.uuid = uid
user.username = uuid_to_username(uid)
user.save()


def get_or_create_user(payload, oidc=False):
"""Get, create or update a user based on the payload."""
user_id = payload.get("sub")
if not user_id:
msg = _("Invalid payload. sub missing")
Expand All @@ -126,6 +201,8 @@ def get_or_create_user(payload, oidc=False):
namespace = payload.get("tid")
user_id = convert_to_uuid(user_id, namespace)

migrate_ad_user(user_id, payload, oidc=oidc)

try_again = False
try:
user = _try_create_or_update(user_id, payload, oidc)
Expand Down

0 comments on commit 61e9838

Please sign in to comment.