Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate orders for user notification policies #2278

Merged
merged 27 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose-developer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ services:
container_name: mysql
labels: *oncall-labels
image: mysql:8.0.32
command: --default-authentication-plugin=mysql_native_password --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
command: --default-authentication-plugin=mysql_native_password --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --max_connections=1024
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increasing max_connections for local concurrency tests that open more connections than default number (151)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think about adding a ./dev/mysql.ini file?

We have four containers (two in both this file + docker-compose-mysql-rabbitmq.yml) which run the mysql image. We could de-dupe this config and just volume mount this new file into all of these containers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All 4 mysql containers share only these two settings: --character-set-server=utf8mb4 and --collation-server=utf8mb4_unicode_ci. Not sure if deduping those is worth of the extra complexity added by volume mounting the ini file. I'll take a closer look at this, I think it should be outside of the scope of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's outside the scope of this PR 👍 just a thought that came to mind 😄

restart: always
environment:
MYSQL_ROOT_PASSWORD: empty
Expand Down
30 changes: 5 additions & 25 deletions engine/apps/api/serializers/user_notification_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from apps.base.models.user_notification_policy import NotificationChannelAPIOptions
from apps.user_management.models import User
from common.api_helpers.custom_fields import OrganizationFilteredPrimaryKeyRelatedField
from common.api_helpers.exceptions import BadRequest, Forbidden
from common.api_helpers.exceptions import Forbidden
from common.api_helpers.mixins import EagerLoadingMixin


Expand All @@ -34,6 +34,7 @@ class UserNotificationPolicyBaseSerializer(EagerLoadingMixin, serializers.ModelS
class Meta:
model = UserNotificationPolicy
fields = ["id", "step", "order", "notify_by", "wait_delay", "important", "user"]
read_only_fields = ["order"]

def to_internal_value(self, data):
if data.get("wait_delay", None):
Expand Down Expand Up @@ -67,7 +68,6 @@ def _notify_by_to_representation(self, instance, result):


class UserNotificationPolicySerializer(UserNotificationPolicyBaseSerializer):
prev_step = serializers.CharField(required=False, write_only=True, allow_null=True)
user = OrganizationFilteredPrimaryKeyRelatedField(
queryset=User.objects,
required=False,
Expand All @@ -80,36 +80,16 @@ class UserNotificationPolicySerializer(UserNotificationPolicyBaseSerializer):
default=NotificationChannelAPIOptions.DEFAULT_NOTIFICATION_CHANNEL,
)

class Meta(UserNotificationPolicyBaseSerializer.Meta):
fields = [*UserNotificationPolicyBaseSerializer.Meta.fields, "prev_step"]
read_only_fields = ("order",)

def create(self, validated_data):
prev_step = validated_data.pop("prev_step", None)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prev_step param seems to be deprecated, I couldn't find any usages


user = validated_data.get("user")
user = validated_data.get("user") or self.context["request"].user
organization = self.context["request"].auth.organization

if not user:
user = self.context["request"].user

self_or_admin = user.self_or_admin(user_to_check=self.context["request"].user, organization=organization)
if not self_or_admin:
raise Forbidden()

if prev_step is not None:
try:
prev_step = UserNotificationPolicy.objects.get(public_primary_key=prev_step)
except UserNotificationPolicy.DoesNotExist:
raise BadRequest(detail="Prev step does not exist")
if prev_step.user != user or prev_step.important != validated_data.get("important", False):
raise BadRequest(detail="UserNotificationPolicy can be created only with the same user and importance")
instance = UserNotificationPolicy.objects.create(**validated_data)
instance.to(prev_step.order + 1)
return instance
else:
instance = UserNotificationPolicy.objects.create(**validated_data)
return instance
instance = UserNotificationPolicy.objects.create(**validated_data)
return instance


class UserNotificationPolicyUpdateSerializer(UserNotificationPolicyBaseSerializer):
Expand Down
25 changes: 9 additions & 16 deletions engine/apps/api/tests/test_user_notification_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def test_user_cant_create_notification_policy_for_user(


@pytest.mark.django_db
def test_create_notification_policy_from_step(
def test_create_notification_policy_order_is_ignored(
user_notification_policy_internal_api_setup,
make_user_auth_headers,
):
Expand All @@ -121,7 +121,7 @@ def test_create_notification_policy_from_step(
url = reverse("api-internal:notification_policy-list")

data = {
"prev_step": wait_notification_step.public_primary_key,
"position": 2023,
"step": UserNotificationPolicy.Step.NOTIFY,
"notify_by": UserNotificationPolicy.NotificationChannel.SLACK,
"wait_delay": None,
Expand All @@ -130,26 +130,19 @@ def test_create_notification_policy_from_step(
}
response = client.post(url, data, format="json", **make_user_auth_headers(admin, token))
assert response.status_code == status.HTTP_201_CREATED
assert response.data["order"] == 1
assert response.data["order"] == 2


@pytest.mark.django_db
def test_create_invalid_notification_policy(user_notification_policy_internal_api_setup, make_user_auth_headers):
def test_move_to_position_position_error(user_notification_policy_internal_api_setup, make_user_auth_headers):
token, steps, users = user_notification_policy_internal_api_setup
wait_notification_step, _, _, _ = steps
admin, _ = users
step = steps[0]
client = APIClient()
url = reverse("api-internal:notification_policy-list")
url = reverse("api-internal:notification_policy-move-to-position", kwargs={"pk": step.public_primary_key})

data = {
"prev_step": wait_notification_step.public_primary_key,
"step": UserNotificationPolicy.Step.NOTIFY,
"notify_by": UserNotificationPolicy.NotificationChannel.SLACK,
"wait_delay": None,
"important": True,
"user": admin.public_primary_key,
}
response = client.post(url, data, format="json", **make_user_auth_headers(admin, token))
# position value only can be 0 or 1 for this test setup, because there are only 2 steps
response = client.put(f"{url}?position=2", content_type="application/json", **make_user_auth_headers(admin, token))
assert response.status_code == status.HTTP_400_BAD_REQUEST


Expand Down Expand Up @@ -221,7 +214,7 @@ def test_admin_can_move_user_step(user_notification_policy_internal_api_setup, m
"api-internal:notification_policy-move-to-position", kwargs={"pk": second_user_step.public_primary_key}
)

response = client.put(f"{url}?position=1", content_type="application/json", **make_user_auth_headers(admin, token))
response = client.put(f"{url}?position=0", content_type="application/json", **make_user_auth_headers(admin, token))
assert response.status_code == status.HTTP_200_OK


Expand Down
7 changes: 6 additions & 1 deletion engine/apps/api/views/user_notification_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ def perform_destroy(self, instance):
def move_to_position(self, request, pk):
instance = self.get_object()
position = get_move_to_position_param(request)
instance.to(position)

try:
instance.to_index(position)
except IndexError:
raise BadRequest(detail="Invalid position")

return Response(status=status.HTTP_200_OK)

@action(detail=False, methods=["get"])
Expand Down
48 changes: 48 additions & 0 deletions engine/apps/base/migrations/0004_auto_20230616_1510.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Generated by Django 3.2.19 on 2023-06-16 15:10

from django.db import migrations, models
from django.db.models import Count

from common.database import get_random_readonly_database_key_if_present_otherwise_default


def fix_duplicate_order_user_notification_policy(apps, schema_editor):
UserNotificationPolicy = apps.get_model('base', 'UserNotificationPolicy')

# it should be safe to use a readonly database because duplicates are pretty infrequent
db = get_random_readonly_database_key_if_present_otherwise_default()

# find all (user_id, important, order) tuples that have more than one entry (meaning duplicates)
items_with_duplicate_orders = UserNotificationPolicy.objects.using(db).values(
"user_id", "important", "order"
).annotate(count=Count("order")).order_by().filter(count__gt=1) # use order_by() to reset any existing ordering

# make sure we don't fix the same (user_id, important) pair more than once
values_to_fix = set((item["user_id"], item["important"]) for item in items_with_duplicate_orders)

for user_id, important in values_to_fix:
policies = UserNotificationPolicy.objects.filter(user_id=user_id, important=important).order_by("order", "id")
# assign correct sequential order for each policy starting from 0
for idx, policy in enumerate(policies):
policy.order = idx
UserNotificationPolicy.objects.bulk_update(policies, fields=["order"])


class Migration(migrations.Migration):

dependencies = [
('base', '0003_delete_organizationlogrecord'),
]

operations = [
migrations.AlterField(
model_name='usernotificationpolicy',
name='order',
field=models.PositiveIntegerField(db_index=True, editable=False, null=True),
),
migrations.RunPython(fix_duplicate_order_user_notification_policy, migrations.RunPython.noop),
migrations.AddConstraint(
model_name='usernotificationpolicy',
constraint=models.UniqueConstraint(fields=('user_id', 'important', 'order'), name='unique_user_notification_policy_order'),
),
]
157 changes: 157 additions & 0 deletions engine/apps/base/models/ordered_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import random
import time
import typing
from functools import wraps

from django.db import IntegrityError, OperationalError, connection, models, transaction

# Update object's order to NULL and shift other objects' orders accordingly in a single SQL query.
SQL_TO = """
UPDATE `{db_table}` `t1`
JOIN `{db_table}` `t2` ON `t2`.`{pk_name}` = %(pk)s
SET `t1`.`order` = IF(`t1`.`{pk_name}` = `t2`.`{pk_name}`, null, IF(`t1`.`order` < `t2`.`order`, `t1`.`order` + 1, `t1`.`order` - 1))
WHERE {ordering_condition}
AND `t2`.`order` != %(order)s
AND `t1`.`order` >= IF(`t2`.`order` > %(order)s, %(order)s, `t2`.`order`)
AND `t1`.`order` <= IF(`t2`.`order` > %(order)s, `t2`.`order`, %(order)s)
ORDER BY IF(`t1`.`order` <= `t2`.`order`, `t1`.`order`, null) DESC, IF(`t1`.`order` >= `t2`.`order`, `t1`.`order`, null) ASC
"""

# Update object's order to NULL and set the other object's order to specified value in a single SQL query.
SQL_SWAP = """
UPDATE `{db_table}` `t1`
JOIN `{db_table}` `t2` ON `t2`.`{pk_name}` = %(pk)s
SET `t1`.`order` = IF(`t1`.`{pk_name}` = `t2`.`{pk_name}`, null, `t2`.`order`)
WHERE {ordering_condition}
AND `t2`.`order` != %(order)s
AND (`t1`.`{pk_name}` = `t2`.`{pk_name}` OR `t1`.`order` = %(order)s)
ORDER BY IF(`t1`.`{pk_name}` = `t2`.`{pk_name}`, 0, 1) ASC
"""


def _retry(exc: typing.Type[Exception] | tuple[typing.Type[Exception], ...], max_attempts: int = 15) -> typing.Callable:
def _retry_with_params(f):
@wraps(f)
def wrapper(*args, **kwargs):
attempts = 0
while attempts < max_attempts:
try:
return f(*args, **kwargs)
except exc:
if attempts == max_attempts - 1:
raise
attempts += 1
time.sleep(random.random())

return wrapper

return _retry_with_params


class OrderedModel(models.Model):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main focus of the PR

"""
This class is intended to be used as a mixin for models that need to be ordered.
It's similar to django-ordered-model: https://github.com/django-ordered-model/django-ordered-model.
The key difference of this implementation is that it allows orders to be unique at the database level and
is designed to work correctly under concurrent load.

Notable differences compared to django-ordered-model:
- order can be unique at the database level;
- order can temporarily be set to NULL while performing moving operations;
- instance.delete() only deletes the instance, and doesn't shift other instances' orders;
- some methods are not implemented because they're not used in the codebase;

Example usage:
class Step(OrderedModel):
user = models.ForeignKey(User, on_delete=models.CASCADE)
order_with_respect_to = ["user_id"] # steps are ordered per user

class Meta:
ordering = ["order"] # to make queryset ordering correct and consistent
unique_together = ["user_id", "order"] # orders are unique per user at the database level

It's possible for orders to be non-sequential, e.g. order sequence [100, 150, 400] is totally possible and valid.
"""

order = models.PositiveIntegerField(editable=False, db_index=True, null=True)
order_with_respect_to: list[str] = []

class Meta:
abstract = True
ordering = ["order"]
constraints = [
models.UniqueConstraint(fields=["order"], name="unique_order"),
]

def save(self, *args, **kwargs) -> None:
if self.order is None:
self._save_no_order_provided()
else:
super().save()

@_retry(OperationalError)
def delete(self, *args, **kwargs) -> None:
super().delete(*args, **kwargs)

@_retry((IntegrityError, OperationalError))
def _save_no_order_provided(self) -> None:
max_order = self.max_order()
self.order = max_order + 1 if max_order is not None else 0
super().save()

@_retry((IntegrityError, OperationalError))
def to(self, order: int) -> None:
if order is None or order < 0:
raise ValueError("Order must be a positive integer.")

sql = SQL_TO.format(
db_table=self._meta.db_table, pk_name=self._meta.pk.name, ordering_condition=self._ordering_condition_sql
)
params = {"pk": self.pk, "order": order, **self._ordering_params}

with transaction.atomic():
with connection.cursor() as cursor:
cursor.execute(sql, params)
self._meta.model.objects.filter(pk=self.pk).update(order=order)

self.refresh_from_db(fields=["order"])

def to_index(self, index: int) -> None:
order = self._get_ordering_queryset().values_list("order", flat=True)[index]
self.to(order)

@_retry((IntegrityError, OperationalError))
def swap(self, order: int) -> None:
if order is None or order < 0:
raise ValueError("Order must be a positive integer.")

sql = SQL_SWAP.format(
db_table=self._meta.db_table, pk_name=self._meta.pk.name, ordering_condition=self._ordering_condition_sql
)
params = {"pk": self.pk, "order": order, **self._ordering_params}

with transaction.atomic():
with connection.cursor() as cursor:
cursor.execute(sql, params)
self._meta.model.objects.filter(pk=self.pk).update(order=order)

self.refresh_from_db(fields=["order"])

def next(self) -> models.Model | None:
return self._get_ordering_queryset().filter(order__gt=self.order).first()

def max_order(self) -> int | None:
return self._get_ordering_queryset().aggregate(models.Max("order"))["order__max"]

def _get_ordering_queryset(self) -> models.QuerySet:
return self._meta.model.objects.filter(**self._ordering_params)

@property
def _ordering_params(self) -> dict[str, typing.Any]:
return {field: getattr(self, field) for field in self.order_with_respect_to}

@property
def _ordering_condition_sql(self) -> str:
# This doesn't insert actual values into the query, but rather uses placeholders to avoid SQL injections.
ordering_parts = ["`t1`.`{0}` = %({0})s".format(field) for field in self.order_with_respect_to]
return " AND ".join(ordering_parts)
9 changes: 7 additions & 2 deletions engine/apps/base/models/user_notification_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from django.core.validators import MinLengthValidator
from django.db import models
from django.db.models import Q, QuerySet
from ordered_model.models import OrderedModel

from apps.base.messaging import get_messaging_backends
from apps.base.models.ordered_model import OrderedModel
from apps.user_management.models import User
from common.exceptions import UserNotificationPolicyCouldNotBeDeleted
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
Expand Down Expand Up @@ -103,7 +103,7 @@ def create_important_policies_for_user(self, user: User) -> "QuerySet[UserNotifi

class UserNotificationPolicy(OrderedModel):
objects = UserNotificationPolicyQuerySet.as_manager()
order_with_respect_to = ("user", "important")
order_with_respect_to = ("user_id", "important")

public_primary_key = models.CharField(
max_length=20,
Expand Down Expand Up @@ -145,6 +145,11 @@ class Step(models.IntegerChoices):

class Meta:
ordering = ("order",)
constraints = [
models.UniqueConstraint(
fields=["user_id", "important", "order"], name="unique_user_notification_policy_order"
)
]

def __str__(self):
return f"{self.pk}: {self.short_verbal}"
Expand Down
Loading