From 22d27fbfccb466909239bf566be8f21afe3a7ee7 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Tue, 12 Mar 2024 18:00:06 +0100 Subject: [PATCH 01/10] Turn CSV into SQL files --- suppression-list/README.md | 48 +++++++++ suppression-list/csv2optout.py | 171 +++++++++++++++++++++++++++++++++ suppression-list/example.csv | 4 + 3 files changed, 223 insertions(+) create mode 100644 suppression-list/README.md create mode 100644 suppression-list/csv2optout.py create mode 100644 suppression-list/example.csv diff --git a/suppression-list/README.md b/suppression-list/README.md new file mode 100644 index 00000000..2c62a77f --- /dev/null +++ b/suppression-list/README.md @@ -0,0 +1,48 @@ +# CSV to Opt-Outs + +### Prepare Input Files + +Using a tool like [xsv](https://github.com/BurntSushi/xsv) to prepare the input CSV files and merge them into one that has the following columns: + +``` +email,reason +alice@corp.com,never subscribed +bob@fundation.org, +... +``` + +### Turn Into SQL + +A Python script with turn the specified CSV into SQL files to be executed on the server. + +``` +$ poetry shell +$ python suppression-list/csv2optout.py example.csv +INFO:__main__:2 entries, 1 batches. +INFO:__main__:'example.csv.0.pre.sql' written. +INFO:__main__:'example.csv.1.apply.sql' written. +INFO:__main__:'example.csv.2.post.sql' written. +``` + +Upload all the files to server, including the input CSV. Use `tar` and `scp` for example: + +``` +$ tar -zcvf "$(date '+%Y-%m-%d')-example.csv.tar.gz" example.csv* +$ scp 20240313-example.csv.tar.gz user@server:. +``` + +### Execute + +On the server, execute them in order. Note that the `*apply*` files are idempotent, and can be interrupted if necessary. + +``` +$ psql -U admin -d ctms + +=# \i example.csv.0.pre.sql + +=# \i example.csv.1.apply.sql +=# \i example.csv.2.apply.sql +=# \i example.csv.3.apply.sql + +=# \i example.csv.4.post.sql +``` \ No newline at end of file diff --git a/suppression-list/csv2optout.py b/suppression-list/csv2optout.py new file mode 100644 index 00000000..a6e106ae --- /dev/null +++ b/suppression-list/csv2optout.py @@ -0,0 +1,171 @@ +import csv +from datetime import datetime, timezone +import logging +from pathlib import Path +import sys + +import click + + +logger = logging.getLogger(__name__) + + +SQL_COMMANDS_PRE = """ +CREATE OR REPLACE PROCEDURE raise_notice (s TEXT) LANGUAGE plpgsql AS +$$ +BEGIN + RAISE NOTICE '%', s; +END; +$$; + +CREATE TEMPORARY TABLE IF NOT EXISTS imported_{tmp_suffix} ( + email TEXT, + unsubscribe_reason TEXT +); + +CALL raise_notice('Start CSV import ({csv_rows_count} rows)...'); + +COPY imported_{tmp_suffix}(email, unsubscribe_reason) + FROM '{csv_path}' -- fullpath + DELIMITER '{delimiter}' + {headers}; + +CALL raise_notice('CSV import done.'); + +CALL raise_notice('Join on existing contacts...'); + +CREATE TABLE IF NOT EXISTS optouts_{tmp_suffix} ( + idx SERIAL UNIQUE, + email_id UUID, + unsubscribe_reason TEXT +); + +WITH all_primary_emails AS ( + SELECT email_id, primary_email FROM emails + WHERE has_opted_out_of_email IS NOT true + UNION + SELECT email_id, primary_email FROM fxa +) +INSERT INTO optouts_{tmp_suffix}(email_id, unsubscribe_reason) + SELECT email_id, unsubscribe_reason + FROM imported_{tmp_suffix} + JOIN all_primary_emails + ON primary_email = email; + +CALL raise_notice('Join on existing contacts done.'); + +SELECT COUNT(*) FROM optouts_{tmp_suffix}; +""" + +SQL_COMMANDS_POST = """ +DROP PROCEDURE raise_notice; +DROP optouts_{tmp_suffix} CASCADE; +""" + +SQL_BATCH = """ +BEGIN; + +CALL raise_notice('Batch {batch}/{batch_count}'); + +UPDATE emails + SET update_timestamp = now(), + has_opted_out_of_email = true, + unsubscribe_reason = tmp.unsubscribe_reason + FROM optouts_{tmp_suffix} tmp + WHERE tmp.email_id = emails.email_id + AND tmp.idx > {start_idx} AND tmp.idx <= {end_idx} + -- Do not overwrite reason if user opted-out in the mean time + AND has_opted_out_of_email IS NOT true; + +INSERT INTO pending_acoustic(email_id, retry) + SELECT email_id, 0 FROM optouts_{tmp_suffix} +WHERE idx > {start_idx} AND idx <= {end_idx}; + +DELETE FROM optouts_{tmp_suffix} + WHERE idx > {start_idx} AND idx <= {end_idx}; + +COMMIT; + +SELECT pg_sleep({sleep_seconds}); +""" + + +def chunks(l, n): + for i in range(0, len(l), n): + yield l[i : i + n] + + +def writefile(path, content): + with open(path, "w") as f: + f.write(content) + logger.info(f"{path!r} written.") + + +@click.command() +@click.argument('csv_path', type=click.Path(exists=True)) +@click.option('--batch-size', default=10000, help='Number of updates per commit.') +@click.option('--files-count', default=5, help='Number of SQL files') +@click.option('--sleep-seconds', default=0.1, help='Wait between batches') +def main(csv_path, batch_size=10000, files_count=5, sleep_seconds=1) -> int: + now = datetime.now(tz=timezone.utc) + tmp_suffix = now.strftime("%Y%m%dT%H%M") + with open(csv_path) as f: + csv_rows_count = sum(1 for _ in f) + f.seek(0) + delimiter = csv.Sniffer().sniff(f.read(1024)).delimiter + f.seek(0) + has_headers = csv.Sniffer().has_header(f.read(1024)) + if has_headers: + csv_rows_count -= 1 + + batch_count = 1 + csv_rows_count // batch_size + + logger.info(f"{csv_rows_count} entries, {batch_count} batches of {batch_size} updates per commit.") + + batch_commands = [] + for i in range(batch_count): + start_idx = i * batch_size + end_idx = (i + 1) * batch_size + batch_commands.append( + SQL_BATCH.format( + batch=i, + batch_count=batch_count, + start_idx=start_idx, + end_idx=end_idx, + tmp_suffix=tmp_suffix, + sleep_seconds=sleep_seconds, + ) + ) + + csv_filename = Path(csv_path).name + writefile( + f"{csv_filename}.0.pre.sql", + SQL_COMMANDS_PRE.format( + headers="CSV HEADER" if has_headers else "", + csv_rows_count=csv_rows_count, + delimiter=delimiter, + tmp_suffix=tmp_suffix, + csv_path=csv_filename, + ), + ) + + chunk_size = 1 + batch_count // files_count + file_count = 0 + for batch in chunks(batch_commands, chunk_size): + file_count += 1 + writefile(f"{csv_filename}.{file_count}.apply.sql", "".join(batch)) + + logger.info(f"Produced {file_count} files, with {chunk_size} commits ({chunk_size * batch_size} updates).") + + writefile( + f"{csv_filename}.{file_count + 1}.post.sql", + SQL_COMMANDS_POST.format( + tmp_suffix=tmp_suffix, + ), + ) + return 0 + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + sys.exit(main()) diff --git a/suppression-list/example.csv b/suppression-list/example.csv new file mode 100644 index 00000000..b2fea31e --- /dev/null +++ b/suppression-list/example.csv @@ -0,0 +1,4 @@ +email,reason +sam.won@else.com,tired +alice@corp.com,never subscribed +bob@fundation.org, From 4b6fdb56b616dbed6a7b60d3333d52040ed9ced8 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Wed, 13 Mar 2024 20:13:30 +0100 Subject: [PATCH 02/10] Document how to create a sandbox database locally --- suppression-list/README.md | 106 +++++++++++++++++++++++++++++++++++-- 1 file changed, 102 insertions(+), 4 deletions(-) diff --git a/suppression-list/README.md b/suppression-list/README.md index 2c62a77f..57a551fd 100644 --- a/suppression-list/README.md +++ b/suppression-list/README.md @@ -1,6 +1,6 @@ # CSV to Opt-Outs -### Prepare Input Files +## Prepare Input Files Using a tool like [xsv](https://github.com/BurntSushi/xsv) to prepare the input CSV files and merge them into one that has the following columns: @@ -11,7 +11,7 @@ bob@fundation.org, ... ``` -### Turn Into SQL +## Turn Into SQL A Python script with turn the specified CSV into SQL files to be executed on the server. @@ -31,18 +31,116 @@ $ tar -zcvf "$(date '+%Y-%m-%d')-example.csv.tar.gz" example.csv* $ scp 20240313-example.csv.tar.gz user@server:. ``` -### Execute +## Execute On the server, execute them in order. Note that the `*apply*` files are idempotent, and can be interrupted if necessary. +First, alter the `*pre*` script to put the full path to the CSV. For example using `sed`: + +``` +$ sed s/"FROM 'example.csv'"/"FROM '\\/path\\/to\\/example.csv'"/ example.csv.0.pre.sql +``` + +And then execute them from `psql`: + ``` $ psql -U admin -d ctms =# \i example.csv.0.pre.sql +... +psql:example.csv.0.pre.sql:23: NOTICE: Join on existing contacts... +CALL +INSERT 0 50430 +... +psql:example.csv.0.pre.sql:43: NOTICE: Join on existing contacts done. +CALL + count +------- + 50430 +(1 row) + +``` + +Then execute each file with its thousands of batches of commits: + +``` =# \i example.csv.1.apply.sql =# \i example.csv.2.apply.sql =# \i example.csv.3.apply.sql +``` +And then cleanup: + +``` =# \i example.csv.4.post.sql -``` \ No newline at end of file +``` + +## Development + +Create a database locally with fake data. + +``` +$ pg_dump -d ctms -U postgres -t 'emails' -t 'fxa' -t 'pending_acoustic' --schema-only --no-acl --no-owner > suppression-list/schema.sql +``` + +``` +$ psql -U postgres -d postgres + +postgres=# CREATE DATABASE optouts; +CREATE DATABASE +postgres=# +postgres=# \c optouts +You are now connected to database "optouts" as user "postgres". +optouts=# +optouts=# SET search_path TO public; +SET +optouts=# +optouts=# \i schema.sql +... +optouts=# +``` + +Generate ~1M fake emails. + +```sql +INSERT INTO emails(email_id, primary_email) + SELECT + gen_random_uuid() AS email_id, + substr(md5(random()::text), 1, (random() * 15)::int) || '@' || substr(md5(random()::text), 1, (random() * 10)::int) || '.' || substr(md5(random()::text), 1, 3) AS primary_email + FROM generate_series(1, 1000000) +ON CONFLICT DO NOTHING; +``` + +Create an FxA account for 1% of them, with half of them using the same primary email. + +```sql +INSERT INTO fxa(email_id, fxa_id, primary_email) + SELECT + email_id, + substr(md5(random()::text), 1, 100) AS fxa_id, + CASE WHEN random() < 0.5 THEN primary_email ELSE substr(md5(random()::text), 1, (random() * 15)::int) || '.fxa@' || substr(md5(random()::text), 1, (random() * 10)::int) || '.' || substr(md5(random()::text), 1, 3) END AS primary_email + FROM emails TABLESAMPLE BERNOULLI(1) +ON CONFLICT DO NOTHING; +``` + +Mark as opt-out 1% of them: + +```sql +UPDATE emails SET has_opted_out_of_email = true +FROM (SELECT email_id FROM emails TABLESAMPLE BERNOULLI(1)) sub +WHERE sub.email_id = emails.email_id; +``` + +Product a CSV to opt-out some of them: + +``` +optouts=# COPY ( + SELECT email, reason + FROM ( + SELECT primary_email AS email, md5(random()::text) AS reason FROM emails TABLESAMPLE BERNOULLI(5) + UNION + SELECT primary_email AS email, md5(random()::text) AS reason FROM fxa TABLESAMPLE BERNOULLI(10) + ) t +) TO '/tmp/optouts.csv'; +``` From ee9ba190e5d9a945ea98d9c8643c1e98fbf3e5d2 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Thu, 14 Mar 2024 15:04:09 +0100 Subject: [PATCH 03/10] Pending record as option --- suppression-list/csv2optout.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/suppression-list/csv2optout.py b/suppression-list/csv2optout.py index a6e106ae..5506e622 100644 --- a/suppression-list/csv2optout.py +++ b/suppression-list/csv2optout.py @@ -79,7 +79,7 @@ INSERT INTO pending_acoustic(email_id, retry) SELECT email_id, 0 FROM optouts_{tmp_suffix} -WHERE idx > {start_idx} AND idx <= {end_idx}; +WHERE {schedule_sync} AND idx > {start_idx} AND idx <= {end_idx}; DELETE FROM optouts_{tmp_suffix} WHERE idx > {start_idx} AND idx <= {end_idx}; @@ -106,7 +106,8 @@ def writefile(path, content): @click.option('--batch-size', default=10000, help='Number of updates per commit.') @click.option('--files-count', default=5, help='Number of SQL files') @click.option('--sleep-seconds', default=0.1, help='Wait between batches') -def main(csv_path, batch_size=10000, files_count=5, sleep_seconds=1) -> int: +@click.option('--schedule-sync', default=False, help='Mark update emails as pending sync') +def main(csv_path, batch_size, files_count, sleep_seconds, schedule_sync) -> int: now = datetime.now(tz=timezone.utc) tmp_suffix = now.strftime("%Y%m%dT%H%M") with open(csv_path) as f: @@ -134,6 +135,7 @@ def main(csv_path, batch_size=10000, files_count=5, sleep_seconds=1) -> int: end_idx=end_idx, tmp_suffix=tmp_suffix, sleep_seconds=sleep_seconds, + schedule_sync=str(schedule_sync).lower(), ) ) From 8b356cb41ef894b2a5b85fea53929d432e117518 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Thu, 14 Mar 2024 16:04:50 +0100 Subject: [PATCH 04/10] Read date from input CSV --- suppression-list/README.md | 7 +-- suppression-list/csv2optout.py | 81 ++++++++++++++++++++++++---------- suppression-list/example.csv | 8 ++-- 3 files changed, 66 insertions(+), 30 deletions(-) diff --git a/suppression-list/README.md b/suppression-list/README.md index 57a551fd..c8759dac 100644 --- a/suppression-list/README.md +++ b/suppression-list/README.md @@ -5,9 +5,10 @@ Using a tool like [xsv](https://github.com/BurntSushi/xsv) to prepare the input CSV files and merge them into one that has the following columns: ``` -email,reason -alice@corp.com,never subscribed -bob@fundation.org, +"Email","Reason","Date" +"alice@corp.com","never subscribed","20200310" +"bob@fundation.org",,"20240501" +"postmaster@localhost","marked undeliverable","19700101" ... ``` diff --git a/suppression-list/csv2optout.py b/suppression-list/csv2optout.py index 5506e622..7bcfc001 100644 --- a/suppression-list/csv2optout.py +++ b/suppression-list/csv2optout.py @@ -1,12 +1,12 @@ import csv -from datetime import datetime, timezone import logging -from pathlib import Path +import re import sys +from datetime import datetime, timezone +from pathlib import Path import click - logger = logging.getLogger(__name__) @@ -20,15 +20,17 @@ CREATE TEMPORARY TABLE IF NOT EXISTS imported_{tmp_suffix} ( email TEXT, - unsubscribe_reason TEXT + unsubscribe_reason TEXT, + tstxt TEXT ); CALL raise_notice('Start CSV import ({csv_rows_count} rows)...'); -COPY imported_{tmp_suffix}(email, unsubscribe_reason) +COPY imported_{tmp_suffix}(email, unsubscribe_reason, ts) FROM '{csv_path}' -- fullpath DELIMITER '{delimiter}' - {headers}; + {headers} + QUOTE AS '"'; CALL raise_notice('CSV import done.'); @@ -37,7 +39,8 @@ CREATE TABLE IF NOT EXISTS optouts_{tmp_suffix} ( idx SERIAL UNIQUE, email_id UUID, - unsubscribe_reason TEXT + unsubscribe_reason TEXT, + ts TIMESTAMP ); WITH all_primary_emails AS ( @@ -46,8 +49,11 @@ UNION SELECT email_id, primary_email FROM fxa ) -INSERT INTO optouts_{tmp_suffix}(email_id, unsubscribe_reason) - SELECT email_id, unsubscribe_reason +INSERT INTO optouts_{tmp_suffix}(email_id, unsubscribe_reason, ts) + SELECT + email_id, + unsubscribe_reason, + to_timestamp(tstxt,'YYYYMMDD')::timestamp AS ts FROM imported_{tmp_suffix} JOIN all_primary_emails ON primary_email = email; @@ -68,7 +74,7 @@ CALL raise_notice('Batch {batch}/{batch_count}'); UPDATE emails - SET update_timestamp = now(), + SET update_timestamp = ts, has_opted_out_of_email = true, unsubscribe_reason = tmp.unsubscribe_reason FROM optouts_{tmp_suffix} tmp @@ -79,7 +85,7 @@ INSERT INTO pending_acoustic(email_id, retry) SELECT email_id, 0 FROM optouts_{tmp_suffix} -WHERE {schedule_sync} AND idx > {start_idx} AND idx <= {end_idx}; + WHERE {schedule_sync} AND idx > {start_idx} AND idx <= {end_idx}; DELETE FROM optouts_{tmp_suffix} WHERE idx > {start_idx} AND idx <= {end_idx}; @@ -102,27 +108,54 @@ def writefile(path, content): @click.command() -@click.argument('csv_path', type=click.Path(exists=True)) -@click.option('--batch-size', default=10000, help='Number of updates per commit.') -@click.option('--files-count', default=5, help='Number of SQL files') -@click.option('--sleep-seconds', default=0.1, help='Wait between batches') -@click.option('--schedule-sync', default=False, help='Mark update emails as pending sync') -def main(csv_path, batch_size, files_count, sleep_seconds, schedule_sync) -> int: - now = datetime.now(tz=timezone.utc) - tmp_suffix = now.strftime("%Y%m%dT%H%M") +@click.argument("csv_path", type=click.Path(exists=True)) +@click.option( + "--check-input-rows", default=1000, help="Number of rows to check from input CSV." +) +@click.option("--batch-size", default=10000, help="Number of updates per commit.") +@click.option("--files-count", default=5, help="Number of SQL files") +@click.option("--sleep-seconds", default=0.1, help="Wait between batches") +@click.option( + "--schedule-sync", default=False, help="Mark update emails as pending sync" +) +def main( + csv_path, check_input_rows, batch_size, files_count, sleep_seconds, schedule_sync +) -> int: + # + # Inspect CSV input. + # with open(csv_path) as f: csv_rows_count = sum(1 for _ in f) f.seek(0) delimiter = csv.Sniffer().sniff(f.read(1024)).delimiter f.seek(0) has_headers = csv.Sniffer().has_header(f.read(1024)) + f.seek(0) + + # Check format of X entries. + reader = csv.reader(f) if has_headers: - csv_rows_count -= 1 + next(reader) + for i, row in enumerate(reader): + if i >= check_input_rows: + break + try: + email, reason, date = row + assert "@" in email + assert re.match(r"\d{8}", date) + except (AssertionError, ValueError): + raise ValueError(f"Line '{row}' does not look right") batch_count = 1 + csv_rows_count // batch_size + logger.info( + f"{csv_rows_count} entries, {batch_count} batches of {batch_size} updates per commit." + ) - logger.info(f"{csv_rows_count} entries, {batch_count} batches of {batch_size} updates per commit.") - + # + # Prepare SQL files + # + now = datetime.now(tz=timezone.utc) + tmp_suffix = now.strftime("%Y%m%dT%H%M") batch_commands = [] for i in range(batch_count): start_idx = i * batch_size @@ -157,7 +190,9 @@ def main(csv_path, batch_size, files_count, sleep_seconds, schedule_sync) -> int file_count += 1 writefile(f"{csv_filename}.{file_count}.apply.sql", "".join(batch)) - logger.info(f"Produced {file_count} files, with {chunk_size} commits ({chunk_size * batch_size} updates).") + logger.info( + f"Produced {file_count} files, with {chunk_size} commits ({chunk_size * batch_size} updates)." + ) writefile( f"{csv_filename}.{file_count + 1}.post.sql", diff --git a/suppression-list/example.csv b/suppression-list/example.csv index b2fea31e..0f4178f7 100644 --- a/suppression-list/example.csv +++ b/suppression-list/example.csv @@ -1,4 +1,4 @@ -email,reason -sam.won@else.com,tired -alice@corp.com,never subscribed -bob@fundation.org, +"Email","Reason","Date" +"alice@corp.com","never subscribed","20200310" +"bob@fundation.org",,"20240501" +"postmaster@localhost","marked undeliverable","19700101" From d2ab9e9115fc942c84ea9b40cb431505cf684676 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Thu, 14 Mar 2024 17:42:25 +0100 Subject: [PATCH 05/10] Adjust to Acoustic export date format --- suppression-list/README.md | 8 ++++---- suppression-list/csv2optout.py | 14 +++++++------- suppression-list/example.csv | 8 ++++---- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/suppression-list/README.md b/suppression-list/README.md index c8759dac..1d513b7d 100644 --- a/suppression-list/README.md +++ b/suppression-list/README.md @@ -5,10 +5,10 @@ Using a tool like [xsv](https://github.com/BurntSushi/xsv) to prepare the input CSV files and merge them into one that has the following columns: ``` -"Email","Reason","Date" -"alice@corp.com","never subscribed","20200310" -"bob@fundation.org",,"20240501" -"postmaster@localhost","marked undeliverable","19700101" +"Email","Date","Reason" +"alice@corp.com","2024-03-12 05:17 PM","Opted out from list id 1364939" +"bob@fundation.org","2024-03-12 05:29 PM", +"postmaster@localhost","2024-03-12 10:11 AM","marked undeliverable" ... ``` diff --git a/suppression-list/csv2optout.py b/suppression-list/csv2optout.py index 7bcfc001..0b9a9f4e 100644 --- a/suppression-list/csv2optout.py +++ b/suppression-list/csv2optout.py @@ -20,13 +20,13 @@ CREATE TEMPORARY TABLE IF NOT EXISTS imported_{tmp_suffix} ( email TEXT, - unsubscribe_reason TEXT, - tstxt TEXT + tstxt TEXT, + unsubscribe_reason TEXT ); CALL raise_notice('Start CSV import ({csv_rows_count} rows)...'); -COPY imported_{tmp_suffix}(email, unsubscribe_reason, ts) +COPY imported_{tmp_suffix}(email, tstxt, unsubscribe_reason) FROM '{csv_path}' -- fullpath DELIMITER '{delimiter}' {headers} @@ -53,7 +53,7 @@ SELECT email_id, unsubscribe_reason, - to_timestamp(tstxt,'YYYYMMDD')::timestamp AS ts + to_timestamp(tstxt,'YYYY-MM-DD HH12:MI AM')::timestamp AS ts FROM imported_{tmp_suffix} JOIN all_primary_emails ON primary_email = email; @@ -74,7 +74,7 @@ CALL raise_notice('Batch {batch}/{batch_count}'); UPDATE emails - SET update_timestamp = ts, + SET update_timestamp = tmp.ts, has_opted_out_of_email = true, unsubscribe_reason = tmp.unsubscribe_reason FROM optouts_{tmp_suffix} tmp @@ -140,9 +140,9 @@ def main( if i >= check_input_rows: break try: - email, reason, date = row + email, date, reason = row assert "@" in email - assert re.match(r"\d{8}", date) + assert re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2} (AM|PM)", date) except (AssertionError, ValueError): raise ValueError(f"Line '{row}' does not look right") diff --git a/suppression-list/example.csv b/suppression-list/example.csv index 0f4178f7..902286a8 100644 --- a/suppression-list/example.csv +++ b/suppression-list/example.csv @@ -1,4 +1,4 @@ -"Email","Reason","Date" -"alice@corp.com","never subscribed","20200310" -"bob@fundation.org",,"20240501" -"postmaster@localhost","marked undeliverable","19700101" +"Email","Date","Reason" +"alice@corp.com","2024-03-12 05:17 PM","Opted out from list id 1364939" +"bob@fundation.org","2024-03-12 05:29 PM", +"postmaster@localhost","2024-03-12 10:11 AM","marked undeliverable" From ad1b7dda1e7d893098af606233b3b7ed95783410 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Fri, 15 Mar 2024 15:48:06 +0100 Subject: [PATCH 06/10] Fix processing of dates and improve logging --- suppression-list/README.md | 10 ++-- suppression-list/csv2optout.py | 97 ++++++++++++++++++++++------------ 2 files changed, 69 insertions(+), 38 deletions(-) diff --git a/suppression-list/README.md b/suppression-list/README.md index 1d513b7d..bd75fff9 100644 --- a/suppression-list/README.md +++ b/suppression-list/README.md @@ -120,7 +120,7 @@ INSERT INTO fxa(email_id, fxa_id, primary_email) SELECT email_id, substr(md5(random()::text), 1, 100) AS fxa_id, - CASE WHEN random() < 0.5 THEN primary_email ELSE substr(md5(random()::text), 1, (random() * 15)::int) || '.fxa@' || substr(md5(random()::text), 1, (random() * 10)::int) || '.' || substr(md5(random()::text), 1, 3) END AS primary_email + CASE WHEN random() < 0.5 THEN primary_email ELSE substr(md5(random()::text), 1, (random() * 15)::int + 1) || '.fxa@' || substr(md5(random()::text), 1, (random() * 10)::int) || '.' || substr(md5(random()::text), 1, 3) END AS primary_email FROM emails TABLESAMPLE BERNOULLI(1) ON CONFLICT DO NOTHING; ``` @@ -137,11 +137,11 @@ Product a CSV to opt-out some of them: ``` optouts=# COPY ( - SELECT email, reason + SELECT email, to_char(date, 'YYYY-MM-DD HH12:MI AM') AS date, reason FROM ( - SELECT primary_email AS email, md5(random()::text) AS reason FROM emails TABLESAMPLE BERNOULLI(5) + SELECT primary_email AS email, NOW() - (random() * (INTERVAL '1 year')) AS date, md5(random()::text) AS reason FROM emails TABLESAMPLE BERNOULLI(5) UNION - SELECT primary_email AS email, md5(random()::text) AS reason FROM fxa TABLESAMPLE BERNOULLI(10) + SELECT primary_email AS email, NOW() - (random() * (INTERVAL '1 year')) AS date, md5(random()::text) AS reason FROM fxa TABLESAMPLE BERNOULLI(10) ) t -) TO '/tmp/optouts.csv'; +) TO '/tmp/optouts.csv' WITH CSV HEADER DELIMITER ',' FORCE QUOTE *; ``` diff --git a/suppression-list/csv2optout.py b/suppression-list/csv2optout.py index 0b9a9f4e..64b7bce6 100644 --- a/suppression-list/csv2optout.py +++ b/suppression-list/csv2optout.py @@ -1,5 +1,6 @@ import csv import logging +import os import re import sys from datetime import datetime, timezone @@ -18,7 +19,8 @@ END; $$; -CREATE TEMPORARY TABLE IF NOT EXISTS imported_{tmp_suffix} ( +CREATE TEMPORARY TABLE IF NOT EXISTS csv_import ( + idx SERIAL UNIQUE, email TEXT, tstxt TEXT, unsubscribe_reason TEXT @@ -26,7 +28,7 @@ CALL raise_notice('Start CSV import ({csv_rows_count} rows)...'); -COPY imported_{tmp_suffix}(email, tstxt, unsubscribe_reason) +COPY csv_import(email, tstxt, unsubscribe_reason) FROM '{csv_path}' -- fullpath DELIMITER '{delimiter}' {headers} @@ -34,8 +36,6 @@ CALL raise_notice('CSV import done.'); -CALL raise_notice('Join on existing contacts...'); - CREATE TABLE IF NOT EXISTS optouts_{tmp_suffix} ( idx SERIAL UNIQUE, email_id UUID, @@ -43,24 +43,36 @@ ts TIMESTAMP ); +CALL raise_notice('Join on existing contacts...'); +{join_batches} +CALL raise_notice('Join on existing contacts done.'); + +SELECT COUNT(*) FROM optouts_{tmp_suffix}; +""" + +SQL_JOIN_BATCH = """ +BEGIN; + +CALL raise_notice('Update batch {batch}/{batch_count}'); + WITH all_primary_emails AS ( SELECT email_id, primary_email FROM emails WHERE has_opted_out_of_email IS NOT true UNION SELECT email_id, primary_email FROM fxa ) -INSERT INTO optouts_{tmp_suffix}(email_id, unsubscribe_reason, ts) +INSERT INTO optouts_{tmp_suffix}(idx, email_id, unsubscribe_reason, ts) SELECT + idx, email_id, unsubscribe_reason, to_timestamp(tstxt,'YYYY-MM-DD HH12:MI AM')::timestamp AS ts - FROM imported_{tmp_suffix} + FROM csv_import JOIN all_primary_emails - ON primary_email = email; - -CALL raise_notice('Join on existing contacts done.'); + ON primary_email = email + WHERE idx > {start_idx} AND idx <= {end_idx}; -SELECT COUNT(*) FROM optouts_{tmp_suffix}; +COMMIT; """ SQL_COMMANDS_POST = """ @@ -68,10 +80,10 @@ DROP optouts_{tmp_suffix} CASCADE; """ -SQL_BATCH = """ +SQL_UPDATE_BATCH = """ BEGIN; -CALL raise_notice('Batch {batch}/{batch_count}'); +CALL raise_notice('Join batch {batch}/{batch_count}'); UPDATE emails SET update_timestamp = tmp.ts, @@ -118,8 +130,17 @@ def writefile(path, content): @click.option( "--schedule-sync", default=False, help="Mark update emails as pending sync" ) +@click.option( + "--csv-path-server", default=".", help="Absolute path where to load the CSV from" +) def main( - csv_path, check_input_rows, batch_size, files_count, sleep_seconds, schedule_sync + csv_path, + check_input_rows, + batch_size, + files_count, + sleep_seconds, + schedule_sync, + csv_path_server, ) -> int: # # Inspect CSV input. @@ -147,28 +168,35 @@ def main( raise ValueError(f"Line '{row}' does not look right") batch_count = 1 + csv_rows_count // batch_size - logger.info( - f"{csv_rows_count} entries, {batch_count} batches of {batch_size} updates per commit." - ) - + chunk_size = 1 + batch_count // files_count + logger.info(f"{csv_rows_count} entries") + logger.info(f"{batch_size} updates per commit") + logger.info(f"{batch_count} batches") + logger.info(f"{files_count} files") + logger.info(f"~{chunk_size} commits per file") # # Prepare SQL files # now = datetime.now(tz=timezone.utc) tmp_suffix = now.strftime("%Y%m%dT%H%M") - batch_commands = [] + join_batches = [] + update_batches = [] for i in range(batch_count): start_idx = i * batch_size end_idx = (i + 1) * batch_size - batch_commands.append( - SQL_BATCH.format( - batch=i, - batch_count=batch_count, - start_idx=start_idx, - end_idx=end_idx, - tmp_suffix=tmp_suffix, - sleep_seconds=sleep_seconds, + params = dict( + batch=i + 1, + batch_count=batch_count, + start_idx=start_idx, + end_idx=end_idx, + tmp_suffix=tmp_suffix, + sleep_seconds=sleep_seconds, + ) + join_batches.append(SQL_JOIN_BATCH.format(**params)) + update_batches.append( + SQL_UPDATE_BATCH.format( schedule_sync=str(schedule_sync).lower(), + **params, ) ) @@ -177,25 +205,28 @@ def main( f"{csv_filename}.0.pre.sql", SQL_COMMANDS_PRE.format( headers="CSV HEADER" if has_headers else "", + join_batches="".join(join_batches), csv_rows_count=csv_rows_count, delimiter=delimiter, tmp_suffix=tmp_suffix, - csv_path=csv_filename, + csv_path=os.path.join(csv_path_server, csv_filename), ), ) - chunk_size = 1 + batch_count // files_count - file_count = 0 - for batch in chunks(batch_commands, chunk_size): - file_count += 1 - writefile(f"{csv_filename}.{file_count}.apply.sql", "".join(batch)) + chunked = list(chunks(update_batches, chunk_size)) + file_count = len(chunked) + for i, batch in enumerate(chunked): + writefile( + f"{csv_filename}.{i+1}.apply.sql", + "".join(batch) + f"CALL raise_notice('File {i+1}/{file_count} done.');", + ) logger.info( f"Produced {file_count} files, with {chunk_size} commits ({chunk_size * batch_size} updates)." ) writefile( - f"{csv_filename}.{file_count + 1}.post.sql", + f"{csv_filename}.{file_count+1}.post.sql", SQL_COMMANDS_POST.format( tmp_suffix=tmp_suffix, ), From 3bac7477063c6fa14eb6daf3fb15770012f17b36 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Tue, 26 Mar 2024 16:07:33 +0100 Subject: [PATCH 07/10] Build all primary emails first --- suppression-list/csv2optout.py | 41 +++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/suppression-list/csv2optout.py b/suppression-list/csv2optout.py index 64b7bce6..84e418b1 100644 --- a/suppression-list/csv2optout.py +++ b/suppression-list/csv2optout.py @@ -25,6 +25,7 @@ tstxt TEXT, unsubscribe_reason TEXT ); +DELETE FROM csv_import; CALL raise_notice('Start CSV import ({csv_rows_count} rows)...'); @@ -36,6 +37,28 @@ CALL raise_notice('CSV import done.'); +CALL raise_notice('Index emails by opt-out column...'); +CREATE INDEX IF NOT EXISTS idx_emails_has_opted_out_of_email ON emails USING btree(has_opted_out_of_email); +CALL raise_notice('Index done.'); + +CALL raise_notice('Build a table with all primary emails...'); +CREATE TABLE IF NOT EXISTS all_primary_emails_{tmp_suffix} ( + email_id UUID, + primary_email TEXT UNIQUE +); + +CALL raise_notice('Primary emails 1/2'); +INSERT INTO all_primary_emails_{tmp_suffix}(email_id, primary_email) + SELECT email_id, lower(primary_email) FROM emails + WHERE has_opted_out_of_email IS NOT true; + +CALL raise_notice('Primary emails 2/2'); +INSERT INTO all_primary_emails_{tmp_suffix}(email_id, primary_email) + SELECT email_id, lower(primary_email) FROM fxa + ON CONFLICT (primary_email) DO NOTHING; + +CALL raise_notice('Primary emails table done.'); + CREATE TABLE IF NOT EXISTS optouts_{tmp_suffix} ( idx SERIAL UNIQUE, email_id UUID, @@ -53,14 +76,8 @@ SQL_JOIN_BATCH = """ BEGIN; -CALL raise_notice('Update batch {batch}/{batch_count}'); +CALL raise_notice('Join batch {batch}/{batch_count}'); -WITH all_primary_emails AS ( - SELECT email_id, primary_email FROM emails - WHERE has_opted_out_of_email IS NOT true - UNION - SELECT email_id, primary_email FROM fxa -) INSERT INTO optouts_{tmp_suffix}(idx, email_id, unsubscribe_reason, ts) SELECT idx, @@ -68,8 +85,8 @@ unsubscribe_reason, to_timestamp(tstxt,'YYYY-MM-DD HH12:MI AM')::timestamp AS ts FROM csv_import - JOIN all_primary_emails - ON primary_email = email + JOIN all_primary_emails_{tmp_suffix} + ON primary_email = lower(email) WHERE idx > {start_idx} AND idx <= {end_idx}; COMMIT; @@ -77,13 +94,15 @@ SQL_COMMANDS_POST = """ DROP PROCEDURE raise_notice; -DROP optouts_{tmp_suffix} CASCADE; +DROP INDEX idx_emails_has_opted_out_of_email; +DROP TABLE optouts_{tmp_suffix} CASCADE; +DROP TABLE all_primary_emails_{tmp_suffix} CASCADE; """ SQL_UPDATE_BATCH = """ BEGIN; -CALL raise_notice('Join batch {batch}/{batch_count}'); +CALL raise_notice('Update batch {batch}/{batch_count}'); UPDATE emails SET update_timestamp = tmp.ts, From 7cefb1fa6da6d3f362c905c5532197c09e3802de Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Wed, 27 Mar 2024 16:19:48 +0100 Subject: [PATCH 08/10] Improve idempotency --- suppression-list/csv2optout.py | 54 ++++++++++++++++------------------ 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/suppression-list/csv2optout.py b/suppression-list/csv2optout.py index 84e418b1..970af854 100644 --- a/suppression-list/csv2optout.py +++ b/suppression-list/csv2optout.py @@ -19,24 +19,6 @@ END; $$; -CREATE TEMPORARY TABLE IF NOT EXISTS csv_import ( - idx SERIAL UNIQUE, - email TEXT, - tstxt TEXT, - unsubscribe_reason TEXT -); -DELETE FROM csv_import; - -CALL raise_notice('Start CSV import ({csv_rows_count} rows)...'); - -COPY csv_import(email, tstxt, unsubscribe_reason) - FROM '{csv_path}' -- fullpath - DELIMITER '{delimiter}' - {headers} - QUOTE AS '"'; - -CALL raise_notice('CSV import done.'); - CALL raise_notice('Index emails by opt-out column...'); CREATE INDEX IF NOT EXISTS idx_emails_has_opted_out_of_email ON emails USING btree(has_opted_out_of_email); CALL raise_notice('Index done.'); @@ -65,6 +47,7 @@ unsubscribe_reason TEXT, ts TIMESTAMP ); +ALTER TABLE optouts_{tmp_suffix} REPLICA IDENTITY FULL; CALL raise_notice('Join on existing contacts...'); {join_batches} @@ -79,15 +62,21 @@ CALL raise_notice('Join batch {batch}/{batch_count}'); INSERT INTO optouts_{tmp_suffix}(idx, email_id, unsubscribe_reason, ts) - SELECT - idx, - email_id, - unsubscribe_reason, - to_timestamp(tstxt,'YYYY-MM-DD HH12:MI AM')::timestamp AS ts - FROM csv_import - JOIN all_primary_emails_{tmp_suffix} - ON primary_email = lower(email) - WHERE idx > {start_idx} AND idx <= {end_idx}; + ( + SELECT + idx, + email_id, + unsubscribe_reason, + to_timestamp(tstxt,'YYYY-MM-DD HH12:MI AM')::timestamp AS ts + FROM csv_import + JOIN all_primary_emails_{tmp_suffix} + ON primary_email = lower(email) + WHERE idx > {start_idx} AND idx <= {end_idx} + ) + ON CONFLICT(idx) DO NOTHING; + +DELETE FROM all_primary_emails_{tmp_suffix} + WHERE idx > {start_idx} AND idx <= {end_idx}; COMMIT; """ @@ -97,6 +86,7 @@ DROP INDEX idx_emails_has_opted_out_of_email; DROP TABLE optouts_{tmp_suffix} CASCADE; DROP TABLE all_primary_emails_{tmp_suffix} CASCADE; +DROP TABLE csv_import CASCADE; """ SQL_UPDATE_BATCH = """ @@ -144,7 +134,7 @@ def writefile(path, content): "--check-input-rows", default=1000, help="Number of rows to check from input CSV." ) @click.option("--batch-size", default=10000, help="Number of updates per commit.") -@click.option("--files-count", default=5, help="Number of SQL files") +@click.option("--files-count", default=3, help="Number of SQL files") @click.option("--sleep-seconds", default=0.1, help="Wait between batches") @click.option( "--schedule-sync", default=False, help="Mark update emails as pending sync" @@ -152,6 +142,11 @@ def writefile(path, content): @click.option( "--csv-path-server", default=".", help="Absolute path where to load the CSV from" ) +@click.option( + "--table-suffix", + default=None, + help="Specify table suffix instead of using current time", +) def main( csv_path, check_input_rows, @@ -160,6 +155,7 @@ def main( sleep_seconds, schedule_sync, csv_path_server, + table_suffix, ) -> int: # # Inspect CSV input. @@ -197,7 +193,7 @@ def main( # Prepare SQL files # now = datetime.now(tz=timezone.utc) - tmp_suffix = now.strftime("%Y%m%dT%H%M") + tmp_suffix = table_suffix or now.strftime("%Y%m%dT%H%M") join_batches = [] update_batches = [] for i in range(batch_count): From 59ff826170932061903562e8a346c98c6bde14ed Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Wed, 27 Mar 2024 16:44:51 +0100 Subject: [PATCH 09/10] Add notes about STAGE --- suppression-list/README.md | 108 +++++++++++++++++++++++++++++++++ suppression-list/csv2optout.py | 1 + 2 files changed, 109 insertions(+) diff --git a/suppression-list/README.md b/suppression-list/README.md index bd75fff9..ddaf18c0 100644 --- a/suppression-list/README.md +++ b/suppression-list/README.md @@ -145,3 +145,111 @@ optouts=# COPY ( ) t ) TO '/tmp/optouts.csv' WITH CSV HEADER DELIMITER ',' FORCE QUOTE *; ``` + +## Full Cookbook on STAGE + +## Preparation of SQL files + +This section does not have to be done again for PROD. + + +Using these input files: + +* `TAFTI_Unengaged.CSV` + +``` +$ head suppression-list/TAFTI_Unengaged.CSV +Email,Reason +xxxx@web.de,TAFTI Behaviorally Unengaged 2024 +``` + +* `Acoustic_Main_Suppression_List_20240314.CSV` + +``` +$ head suppression-list/Acoustic_Main_Suppression_List_20240314.CSV +"Email","Opt In Date","Opt In Details" +"xxxxx@yahoo.com","2021-01-13 03:00 PM","User Name: sftpdrop_moco@mozilla.com. IP Address: 0.0.0.0" +``` + +I created a single CSV file `20240318-suppression-list.csv`: + +``` +$ head 20240318-suppression-list.csv +"Email","Date","Reason" +"xxxx@web.de","2024-03-18 05:32 PM","TAFTI Behaviorally Unengaged 2024" +``` + +Which I turned into SQL files to be executed using this command: + +``` +$ python csv2optout.py 20240318-suppression-list.csv --csv-path-server=`pwd` +``` + + +## Import the CSV First + +Since the `COPY` SQL command requires `superuser` privileges, we will do this first manually using `\copy`. + +``` +ERROR: must be superuser or a member of the pg_read_server_files role to COPY from a file +HINT: Anyone can COPY to stdout or from stdin. psql's \copy command also works for anyone. +``` + +Create the table manually: + +``` +CREATE TABLE csv_import ( + idx SERIAL UNIQUE, + email TEXT, + tstxt TEXT, + unsubscribe_reason TEXT +); +``` +And import the `20240318-suppression-list.csv` file: +``` +ctms=> \copy csv_import(email, tstxt, unsubscribe_reason) FROM '20240318-suppression-list.csv' WITH DELIMITER ',' CSV HEADER QUOTE AS '"'; +``` + + +## Execute the SQL files + +Enable logging of elapsed time and abort on error. + +``` +ctms=> \timing +ctms=> \set ON_ERROR_STOP on +``` + +The first SQL will join the imported CSV with the emails. This does not alter any existing table. + +This took around 3H in STAGE. + +``` +ctms=> \i 20240318-suppression-list.csv.0.pre.sql +``` + +> Note: This script is idempotent, and be (re)executed several times if necessary. + + +The following SQL files will perform the updates on the `emails` table. + +They should take around 2H each. + +``` +ctms=> \i 20240318-suppression-list.csv.1.apply.sql +``` + +``` +ctms=> \i 20240318-suppression-list.csv.2.apply.sql +``` + +``` +ctms=> \i 20240318-suppression-list.csv.3.apply.sql +``` + + +## Cleanup + +``` +ctms=> \i 20240318-suppression-list.csv.4.post.sql +``` diff --git a/suppression-list/csv2optout.py b/suppression-list/csv2optout.py index 970af854..c319c8a8 100644 --- a/suppression-list/csv2optout.py +++ b/suppression-list/csv2optout.py @@ -28,6 +28,7 @@ email_id UUID, primary_email TEXT UNIQUE ); +ALTER TABLE all_primary_emails_{tmp_suffix} REPLICA IDENTITY FULL; CALL raise_notice('Primary emails 1/2'); INSERT INTO all_primary_emails_{tmp_suffix}(email_id, primary_email) From ab9414874ac65ff8d575593f1682938124bc1f39 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Fri, 5 Apr 2024 15:40:55 +0200 Subject: [PATCH 10/10] Fix script pre --- suppression-list/csv2optout.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/suppression-list/csv2optout.py b/suppression-list/csv2optout.py index c319c8a8..da16aba6 100644 --- a/suppression-list/csv2optout.py +++ b/suppression-list/csv2optout.py @@ -76,7 +76,7 @@ ) ON CONFLICT(idx) DO NOTHING; -DELETE FROM all_primary_emails_{tmp_suffix} +DELETE FROM csv_import WHERE idx > {start_idx} AND idx <= {end_idx}; COMMIT;