Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn suppression list CSV into a set of SQL files to be ran on the server #892

Merged
merged 10 commits into from
Apr 9, 2024
147 changes: 147 additions & 0 deletions suppression-list/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# CSV to Opt-Outs

## Prepare Input Files

Using a tool like [xsv](https://github.com/BurntSushi/xsv) to prepare the input CSV files and merge them into one that has the following columns:

```
"Email","Date","Reason"
"alice@corp.com","2024-03-12 05:17 PM","Opted out from list id 1364939"
"bob@fundation.org","2024-03-12 05:29 PM",
"postmaster@localhost","2024-03-12 10:11 AM","marked undeliverable"
...
```

## Turn Into SQL

A Python script with turn the specified CSV into SQL files to be executed on the server.

```
$ poetry shell
$ python suppression-list/csv2optout.py example.csv
INFO:__main__:2 entries, 1 batches.
INFO:__main__:'example.csv.0.pre.sql' written.
INFO:__main__:'example.csv.1.apply.sql' written.
INFO:__main__:'example.csv.2.post.sql' written.
```

Upload all the files to server, including the input CSV. Use `tar` and `scp` for example:

```
$ tar -zcvf "$(date '+%Y-%m-%d')-example.csv.tar.gz" example.csv*
$ scp 20240313-example.csv.tar.gz user@server:.
```

## Execute

On the server, execute them in order. Note that the `*apply*` files are idempotent, and can be interrupted if necessary.

First, alter the `*pre*` script to put the full path to the CSV. For example using `sed`:

```
$ sed s/"FROM 'example.csv'"/"FROM '\\/path\\/to\\/example.csv'"/ example.csv.0.pre.sql
```

And then execute them from `psql`:

```
$ psql -U admin -d ctms

=# \i example.csv.0.pre.sql

...
psql:example.csv.0.pre.sql:23: NOTICE: Join on existing contacts...
CALL
INSERT 0 50430
...
psql:example.csv.0.pre.sql:43: NOTICE: Join on existing contacts done.
CALL
count
-------
50430
(1 row)

```

Then execute each file with its thousands of batches of commits:

```
=# \i example.csv.1.apply.sql
=# \i example.csv.2.apply.sql
=# \i example.csv.3.apply.sql
```

And then cleanup:

```
=# \i example.csv.4.post.sql
```

## Development

Create a database locally with fake data.

```
$ pg_dump -d ctms -U postgres -t 'emails' -t 'fxa' -t 'pending_acoustic' --schema-only --no-acl --no-owner > suppression-list/schema.sql
```

```
$ psql -U postgres -d postgres

postgres=# CREATE DATABASE optouts;
CREATE DATABASE
postgres=#
postgres=# \c optouts
You are now connected to database "optouts" as user "postgres".
optouts=#
optouts=# SET search_path TO public;
SET
optouts=#
optouts=# \i schema.sql
...
optouts=#
```

Generate ~1M fake emails.

```sql
INSERT INTO emails(email_id, primary_email)
SELECT
gen_random_uuid() AS email_id,
substr(md5(random()::text), 1, (random() * 15)::int) || '@' || substr(md5(random()::text), 1, (random() * 10)::int) || '.' || substr(md5(random()::text), 1, 3) AS primary_email
FROM generate_series(1, 1000000)
ON CONFLICT DO NOTHING;
```

Create an FxA account for 1% of them, with half of them using the same primary email.

```sql
INSERT INTO fxa(email_id, fxa_id, primary_email)
SELECT
email_id,
substr(md5(random()::text), 1, 100) AS fxa_id,
CASE WHEN random() < 0.5 THEN primary_email ELSE substr(md5(random()::text), 1, (random() * 15)::int) || '.fxa@' || substr(md5(random()::text), 1, (random() * 10)::int) || '.' || substr(md5(random()::text), 1, 3) END AS primary_email
FROM emails TABLESAMPLE BERNOULLI(1)
ON CONFLICT DO NOTHING;
```

Mark as opt-out 1% of them:

```sql
UPDATE emails SET has_opted_out_of_email = true
FROM (SELECT email_id FROM emails TABLESAMPLE BERNOULLI(1)) sub
WHERE sub.email_id = emails.email_id;
```

Product a CSV to opt-out some of them:

```
optouts=# COPY (
SELECT email, reason
FROM (
SELECT primary_email AS email, md5(random()::text) AS reason FROM emails TABLESAMPLE BERNOULLI(5)
UNION
SELECT primary_email AS email, md5(random()::text) AS reason FROM fxa TABLESAMPLE BERNOULLI(10)
) t
) TO '/tmp/optouts.csv';
```
208 changes: 208 additions & 0 deletions suppression-list/csv2optout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import csv
import logging
import re
import sys
from datetime import datetime, timezone
from pathlib import Path

import click

logger = logging.getLogger(__name__)


SQL_COMMANDS_PRE = """
CREATE OR REPLACE PROCEDURE raise_notice (s TEXT) LANGUAGE plpgsql AS
$$
BEGIN
RAISE NOTICE '%', s;
END;
$$;

CREATE TEMPORARY TABLE IF NOT EXISTS imported_{tmp_suffix} (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a table suffix for a temp table? The table would be dropped at the end of the session by default (or at the end of a transaction if we specify on commit drop)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right, this is not necessary

email TEXT,
tstxt TEXT,
unsubscribe_reason TEXT
);

CALL raise_notice('Start CSV import ({csv_rows_count} rows)...');

COPY imported_{tmp_suffix}(email, tstxt, unsubscribe_reason)
FROM '{csv_path}' -- fullpath
DELIMITER '{delimiter}'
{headers}
QUOTE AS '"';

CALL raise_notice('CSV import done.');

CALL raise_notice('Join on existing contacts...');

CREATE TABLE IF NOT EXISTS optouts_{tmp_suffix} (
leplatrem marked this conversation as resolved.
Show resolved Hide resolved
idx SERIAL UNIQUE,
email_id UUID,
unsubscribe_reason TEXT,
ts TIMESTAMP
);

WITH all_primary_emails AS (
SELECT email_id, primary_email FROM emails
WHERE has_opted_out_of_email IS NOT true
UNION
SELECT email_id, primary_email FROM fxa
)
INSERT INTO optouts_{tmp_suffix}(email_id, unsubscribe_reason, ts)
SELECT
email_id,
unsubscribe_reason,
to_timestamp(tstxt,'YYYY-MM-DD HH12:MI AM')::timestamp AS ts
FROM imported_{tmp_suffix}
JOIN all_primary_emails
ON primary_email = email;

CALL raise_notice('Join on existing contacts done.');

SELECT COUNT(*) FROM optouts_{tmp_suffix};
"""

SQL_COMMANDS_POST = """
DROP PROCEDURE raise_notice;
DROP optouts_{tmp_suffix} CASCADE;
"""

SQL_BATCH = """
BEGIN;

CALL raise_notice('Batch {batch}/{batch_count}');

UPDATE emails
SET update_timestamp = tmp.ts,
has_opted_out_of_email = true,
unsubscribe_reason = tmp.unsubscribe_reason
FROM optouts_{tmp_suffix} tmp
WHERE tmp.email_id = emails.email_id
AND tmp.idx > {start_idx} AND tmp.idx <= {end_idx}
-- Do not overwrite reason if user opted-out in the mean time
AND has_opted_out_of_email IS NOT true;

INSERT INTO pending_acoustic(email_id, retry)
SELECT email_id, 0 FROM optouts_{tmp_suffix}
WHERE {schedule_sync} AND idx > {start_idx} AND idx <= {end_idx};

DELETE FROM optouts_{tmp_suffix}
WHERE idx > {start_idx} AND idx <= {end_idx};

COMMIT;

SELECT pg_sleep({sleep_seconds});
"""


def chunks(l, n):
for i in range(0, len(l), n):
yield l[i : i + n]


def writefile(path, content):
with open(path, "w") as f:
f.write(content)
logger.info(f"{path!r} written.")


@click.command()
@click.argument("csv_path", type=click.Path(exists=True))
@click.option(
"--check-input-rows", default=1000, help="Number of rows to check from input CSV."
)
@click.option("--batch-size", default=10000, help="Number of updates per commit.")
@click.option("--files-count", default=5, help="Number of SQL files")
@click.option("--sleep-seconds", default=0.1, help="Wait between batches")
@click.option(
"--schedule-sync", default=False, help="Mark update emails as pending sync"
)
def main(
csv_path, check_input_rows, batch_size, files_count, sleep_seconds, schedule_sync
) -> int:
#
# Inspect CSV input.
#
with open(csv_path) as f:
csv_rows_count = sum(1 for _ in f)
f.seek(0)
delimiter = csv.Sniffer().sniff(f.read(1024)).delimiter
f.seek(0)
has_headers = csv.Sniffer().has_header(f.read(1024))
f.seek(0)

# Check format of X entries.
reader = csv.reader(f)
if has_headers:
next(reader)
for i, row in enumerate(reader):
if i >= check_input_rows:
break
try:
email, date, reason = row
assert "@" in email
assert re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2} (AM|PM)", date)
except (AssertionError, ValueError):
raise ValueError(f"Line '{row}' does not look right")

batch_count = 1 + csv_rows_count // batch_size
logger.info(
f"{csv_rows_count} entries, {batch_count} batches of {batch_size} updates per commit."
)

#
# Prepare SQL files
#
now = datetime.now(tz=timezone.utc)
tmp_suffix = now.strftime("%Y%m%dT%H%M")
batch_commands = []
for i in range(batch_count):
start_idx = i * batch_size
end_idx = (i + 1) * batch_size
batch_commands.append(
SQL_BATCH.format(
leplatrem marked this conversation as resolved.
Show resolved Hide resolved
batch=i,
batch_count=batch_count,
start_idx=start_idx,
end_idx=end_idx,
tmp_suffix=tmp_suffix,
sleep_seconds=sleep_seconds,
schedule_sync=str(schedule_sync).lower(),
)
)

csv_filename = Path(csv_path).name
writefile(
f"{csv_filename}.0.pre.sql",
SQL_COMMANDS_PRE.format(
headers="CSV HEADER" if has_headers else "",
csv_rows_count=csv_rows_count,
delimiter=delimiter,
tmp_suffix=tmp_suffix,
csv_path=csv_filename,
),
)

chunk_size = 1 + batch_count // files_count
file_count = 0
for batch in chunks(batch_commands, chunk_size):
file_count += 1
writefile(f"{csv_filename}.{file_count}.apply.sql", "".join(batch))

logger.info(
f"Produced {file_count} files, with {chunk_size} commits ({chunk_size * batch_size} updates)."
)

writefile(
f"{csv_filename}.{file_count + 1}.post.sql",
SQL_COMMANDS_POST.format(
tmp_suffix=tmp_suffix,
),
)
return 0


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
sys.exit(main())
4 changes: 4 additions & 0 deletions suppression-list/example.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"Email","Date","Reason"
"alice@corp.com","2024-03-12 05:17 PM","Opted out from list id 1364939"
"bob@fundation.org","2024-03-12 05:29 PM",
"postmaster@localhost","2024-03-12 10:11 AM","marked undeliverable"