Skip to content

Commit

Permalink
Merge pull request #5741 from nyaruka/new_msg_counts_pt4
Browse files Browse the repository at this point in the history
Stop writing and squashing old message counts
  • Loading branch information
rowanseymour authored Dec 11, 2024
2 parents 8c898a1 + 042155f commit fd02ba6
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 175 deletions.
4 changes: 2 additions & 2 deletions temba/msgs/migrations/0279_backfill_new_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from django.db.models import Sum


def backfill_folder_counts(apps, schema_editor):
def backfill_folder_counts(apps, schema_editor): # pragma: no cover
Org = apps.get_model("orgs", "Org")

org_ids = list(Org.objects.filter(is_active=True).order_by("id").values_list("id", flat=True))
Expand All @@ -23,7 +23,7 @@ def backfill_folder_counts(apps, schema_editor):
print(f"> updated counts for {num_backfilled}/{len(org_ids)} orgs")


def backfill_for_org(apps, org) -> int:
def backfill_for_org(apps, org) -> int: # pragma: no cover
ItemCount = apps.get_model("orgs", "ItemCount")

with transaction.atomic():
Expand Down
185 changes: 185 additions & 0 deletions temba/msgs/migrations/0280_update_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Generated by Django 5.1.4 on 2024-12-11 15:34

from django.db import migrations

SQL = """
----------------------------------------------------------------------
-- Handles INSERT statements on msg table
----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION temba_msg_on_insert() RETURNS TRIGGER AS $$
BEGIN
-- add broadcast counts for all new broadcast values
INSERT INTO msgs_broadcastmsgcount("broadcast_id", "count", "is_squashed")
SELECT broadcast_id, count(*), FALSE FROM newtab WHERE broadcast_id IS NOT NULL GROUP BY broadcast_id;
-- add positive item counts for all rows which belong to a folder
INSERT INTO orgs_itemcount("org_id", "scope", "count", "is_squashed")
SELECT org_id, temba_msg_countscope(newtab), count(*), FALSE FROM newtab
WHERE temba_msg_countscope(newtab) IS NOT NULL
GROUP BY 1, 2;
-- add channel counts for all messages with a channel
INSERT INTO channels_channelcount("channel_id", "count_type", "day", "count", "is_squashed")
SELECT channel_id, temba_msg_determine_channel_count_code(newtab), created_on::date, count(*), FALSE FROM newtab
WHERE channel_id IS NOT NULL GROUP BY channel_id, temba_msg_determine_channel_count_code(newtab), created_on::date;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
----------------------------------------------------------------------
-- Handles DELETE statements on msg table
----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION temba_msg_on_delete() RETURNS TRIGGER AS $$
BEGIN
-- add negative item counts for all rows that belonged to a folder
INSERT INTO orgs_itemcount("org_id", "scope", "count", "is_squashed")
SELECT org_id, temba_msg_countscope(oldtab), -count(*), FALSE FROM oldtab
WHERE temba_msg_countscope(oldtab) IS NOT NULL
GROUP BY 1, 2;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
----------------------------------------------------------------------
-- Handles UPDATE statements on msg table
----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION temba_msg_on_update() RETURNS TRIGGER AS $$
BEGIN
-- add negative item counts for all rows that belonged to a folder they no longer belong to
INSERT INTO orgs_itemcount("org_id", "scope", "count", "is_squashed")
SELECT o.org_id, temba_msg_countscope(o), -count(*), FALSE FROM oldtab o
INNER JOIN newtab n ON n.id = o.id
WHERE temba_msg_countscope(o) IS DISTINCT FROM temba_msg_countscope(n) AND temba_msg_countscope(o) IS NOT NULL
GROUP BY 1, 2;
-- add positive item counts for all rows that now belong to a folder they didn't belong to
INSERT INTO orgs_itemcount("org_id", "scope", "count", "is_squashed")
SELECT n.org_id, temba_msg_countscope(n), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE temba_msg_countscope(o) IS DISTINCT FROM temba_msg_countscope(n) AND temba_msg_countscope(n) IS NOT NULL
GROUP BY 1, 2;
-- add negative old-state label counts for all messages being archived/restored
INSERT INTO msgs_labelcount("label_id", "is_archived", "count", "is_squashed")
SELECT ml.label_id, o.visibility != 'V', -count(*), FALSE FROM oldtab o
INNER JOIN newtab n ON n.id = o.id
INNER JOIN msgs_msg_labels ml ON ml.msg_id = o.id
WHERE (o.visibility = 'V' AND n.visibility != 'V') or (o.visibility != 'V' AND n.visibility = 'V')
GROUP BY 1, 2;
-- add new-state label counts for all messages being archived/restored
INSERT INTO msgs_labelcount("label_id", "is_archived", "count", "is_squashed")
SELECT ml.label_id, n.visibility != 'V', count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
INNER JOIN msgs_msg_labels ml ON ml.msg_id = n.id
WHERE (o.visibility = 'V' AND n.visibility != 'V') or (o.visibility != 'V' AND n.visibility = 'V')
GROUP BY 1, 2;
-- add new flow activity counts for incoming messages now marked as handled by a flow
INSERT INTO flows_flowactivitycount("flow_id", "scope", "count", "is_squashed")
SELECT s.flow_id, unnest(ARRAY[
format('msgsin:hour:%s', extract(hour FROM NOW())),
format('msgsin:dow:%s', extract(isodow FROM NOW())),
format('msgsin:date:%s', NOW()::date)
]), s.msgs, FALSE
FROM (
SELECT n.flow_id, count(*) AS msgs FROM newtab n INNER JOIN oldtab o ON o.id = n.id
WHERE n.direction = 'I' AND o.flow_id IS NULL AND n.flow_id IS NOT NULL
GROUP BY 1
) s;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
----------------------------------------------------------------------
-- Handles INSERT statements on broadcast table
----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION temba_broadcast_on_insert() RETURNS TRIGGER AS $$
BEGIN
-- add positive item counts for all rows which belong to a folder
INSERT INTO orgs_itemcount("org_id", "scope", "count", "is_squashed")
SELECT org_id, temba_broadcast_countscope(newtab), count(*), FALSE FROM newtab
WHERE temba_broadcast_countscope(newtab) IS NOT NULL
GROUP BY 1, 2;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
----------------------------------------------------------------------
-- Handles DELETE statements on broadcast table
----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION temba_broadcast_on_delete() RETURNS TRIGGER AS $$
BEGIN
-- add negative item counts for all rows that belonged to a folder
INSERT INTO orgs_itemcount("org_id", "scope", "count", "is_squashed")
SELECT org_id, temba_broadcast_countscope(oldtab), -count(*), FALSE FROM oldtab
WHERE temba_broadcast_countscope(oldtab) IS NOT NULL
GROUP BY 1, 2;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
----------------------------------------------------------------------
-- Handles UPDATE statements on broadcast table
----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION temba_broadcast_on_update() RETURNS TRIGGER AS $$
BEGIN
-- add negative counts for all old non-null item count scopes that don't match the new ones
INSERT INTO orgs_itemcount("org_id", "scope", "count", "is_squashed")
SELECT o.org_id, temba_broadcast_countscope(o), -count(*), FALSE FROM oldtab o
INNER JOIN newtab n ON n.id = o.id
WHERE temba_broadcast_countscope(o) IS DISTINCT FROM temba_broadcast_countscope(n) AND temba_broadcast_countscope(o) IS NOT NULL
GROUP BY 1, 2;
-- add positive counts for all new non-null item counts that don't match the old ones
INSERT INTO orgs_itemcount("org_id", "scope", "count", "is_squashed")
SELECT n.org_id, temba_broadcast_countscope(n), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE temba_broadcast_countscope(o) IS DISTINCT FROM temba_broadcast_countscope(n) AND temba_broadcast_countscope(n) IS NOT NULL
GROUP BY 1, 2;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
----------------------------------------------------------------------
-- Handles INSERT statements on ivr_call table
----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION temba_ivrcall_on_insert() RETURNS TRIGGER AS $$
BEGIN
-- add positive item counts for all rows being inserted
INSERT INTO orgs_itemcount("org_id", "scope", "count", "is_squashed")
SELECT org_id, 'msgs:folder:C', count(*), FALSE FROM newtab GROUP BY 1;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
----------------------------------------------------------------------
-- Handles DELETE statements on ivr_call table
----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION temba_ivrcall_on_delete() RETURNS TRIGGER AS $$
BEGIN
-- add negative item counts for all rows being deleted
INSERT INTO orgs_itemcount("org_id", "scope", "count", "is_squashed")
SELECT org_id, 'msgs:folder:C', -count(*), FALSE FROM oldtab GROUP BY 1;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
DROP FUNCTION temba_broadcast_determine_system_label(msgs_broadcast);
DROP FUNCTION temba_msg_determine_system_label(msgs_msg);
"""


class Migration(migrations.Migration):

dependencies = [("msgs", "0279_backfill_new_counts")]

operations = [migrations.RunSQL(SQL)]
3 changes: 1 addition & 2 deletions temba/msgs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from temba.utils.crons import cron_task

from .models import BroadcastMsgCount, LabelCount, Media, Msg, SystemLabelCount
from .models import BroadcastMsgCount, LabelCount, Media, Msg

logger = logging.getLogger(__name__)

Expand All @@ -32,7 +32,6 @@ def fail_old_android_messages():

@cron_task(lock_timeout=7200)
def squash_msg_counts():
SystemLabelCount.squash()
LabelCount.squash()
BroadcastMsgCount.squash()

Expand Down
59 changes: 0 additions & 59 deletions temba/msgs/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from temba.schedules.models import Schedule
from temba.templates.models import TemplateTranslation
from temba.tests import CRUDLTestMixin, TembaTest, mock_mailroom, mock_uuids
from temba.tests.base import MigrationTest
from temba.tests.engine import MockSessionWriter
from temba.tickets.models import Ticket
from temba.utils import s3
Expand Down Expand Up @@ -3148,61 +3147,3 @@ def upload(user, path):
self.login(self.customer_support, choose_org=self.org)
response = self.client.get(list_url)
self.assertEqual([media2, media1], list(response.context["object_list"]))


class BackfillNewCountsTest(MigrationTest):
app = "msgs"
migrate_from = "0278_update_triggers"
migrate_to = "0279_backfill_new_counts"

def setUpBeforeMigration(self, apps):
org1_flow = self.create_flow("Flow")
org2_flow = self.create_flow("Flow", org=self.org2)
org1_contact = self.create_contact("Contact 1", phone="+1234567890")
org2_contact = self.create_contact("Contact 1", phone="+1234567890", org=self.org2)
self.create_channel("A", "Relayer", "1234", org=self.org2)

for x in range(5):
self.create_incoming_msg(org1_contact, str(x))
for x in range(4):
self.create_incoming_msg(org1_contact, str(x), flow=org1_flow)
for x in range(3):
self.create_outgoing_msg(org1_contact, str(x), status=Msg.STATUS_SENT)

for x in range(2):
self.create_incoming_msg(org2_contact, str(x))
self.create_incoming_msg(org2_contact, str(x), flow=org2_flow)

self.org2.counts.all().delete()

def test_migration(self):
def assert_counts(org, expected: dict):
self.assertEqual(SystemLabel.get_counts(org), expected)

assert_counts(
self.org,
{
SystemLabel.TYPE_INBOX: 5,
SystemLabel.TYPE_FLOWS: 4,
SystemLabel.TYPE_ARCHIVED: 0,
SystemLabel.TYPE_OUTBOX: 0,
SystemLabel.TYPE_SENT: 3,
SystemLabel.TYPE_FAILED: 0,
SystemLabel.TYPE_SCHEDULED: 0,
SystemLabel.TYPE_CALLS: 0,
},
)

assert_counts(
self.org2,
{
SystemLabel.TYPE_INBOX: 2,
SystemLabel.TYPE_FLOWS: 1,
SystemLabel.TYPE_ARCHIVED: 0,
SystemLabel.TYPE_OUTBOX: 0,
SystemLabel.TYPE_SENT: 0,
SystemLabel.TYPE_FAILED: 0,
SystemLabel.TYPE_SCHEDULED: 0,
SystemLabel.TYPE_CALLS: 0,
},
)
Loading

0 comments on commit fd02ba6

Please sign in to comment.