-
Notifications
You must be signed in to change notification settings - Fork 6
/
pg_fact_loader--1.4.sql
4704 lines (4228 loc) · 170 KB
/
pg_fact_loader--1.4.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* pg_fact_loader--1.0.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pg_fact_loader" to load this file. \quit
CREATE FUNCTION fact_loader._launch_worker(oid)
RETURNS pg_catalog.INT4 STRICT
AS 'MODULE_PATHNAME', 'pg_fact_loader_worker'
LANGUAGE C;
CREATE FUNCTION fact_loader.launch_worker()
RETURNS pg_catalog.INT4 STRICT
AS 'SELECT fact_loader._launch_worker(oid) FROM pg_database WHERE datname = current_database();'
LANGUAGE SQL;
CREATE TABLE fact_loader.fact_tables (
fact_table_id SERIAL PRIMARY KEY,
fact_table_relid REGCLASS NOT NULL,
fact_table_agg_proid REGPROC NULL, --This may only be used to generate a merge function but is not used in automation
enabled BOOLEAN NOT NULL DEFAULT FALSE,
priority INT,
attempt_number INT,
retries_allowed INT DEFAULT 0,
force_worker_priority BOOLEAN NOT NULL DEFAULT FALSE,
last_refresh_source_cutoff TIMESTAMPTZ,
last_refresh_attempted_at TIMESTAMPTZ,
--TODO - answer if we want the worker to bail or record messages on ERROR (or both)
last_refresh_succeeded BOOLEAN,
row_created_at TIMESTAMPTZ DEFAULT NOW(),
row_updated_at TIMESTAMPTZ,
CONSTRAINT unique_fact_tables UNIQUE (fact_table_relid)
);
SELECT pg_catalog.pg_extension_config_dump('fact_loader.fact_tables', '');
CREATE TABLE fact_loader.fact_table_deps (
fact_table_dep_id SERIAL PRIMARY KEY,
parent_id INT NOT NULL REFERENCES fact_loader.fact_tables (fact_table_id),
child_id INT NOT NULL REFERENCES fact_loader.fact_tables (fact_table_id),
/*****
In very many cases, you will use the same procs for insert, update, and delete
even with multiple dependencies. This is why you must give defaults here which
will be used to auto-populate fact_loader.fact_table_dep_queue_table_deps which
can be overridden if necessary for each queue table.
After you configure all of your fact tables and queue tables, run the function
refresh_fact_table_dep_queue_table_deps manually to populate fact_table_dep_queue_table_deps,
then make any changes as necessary.
You can see an example of this in the test suite
"seeds" file. You can also see an override example with order_emails_fact having a
different proc for orders and reorders delete cases.
*/
default_insert_merge_proid REGPROC NOT NULL,
default_update_merge_proid REGPROC NOT NULL,
default_delete_merge_proid REGPROC NOT NULL,
row_created_at TIMESTAMPTZ DEFAULT NOW(),
row_updated_at TIMESTAMPTZ,
CONSTRAINT unique_fact_deps UNIQUE (parent_id, child_id)
);
SELECT pg_catalog.pg_extension_config_dump('fact_loader.fact_table_deps', '');
CREATE TABLE fact_loader.queue_tables (
queue_table_id SERIAL PRIMARY KEY,
queue_table_relid REGCLASS NOT NULL,
queue_of_base_table_relid REGCLASS NOT NULL,
/****
NOTE - the reason for this config existing here is that we have no built-in way
in pglogical to know which tables belong to which pglogical node. Therefore, we
need to configure that. We hope that some time down the road, this will change,
and we can derive this information.
*/
pglogical_node_if_id INT NOT NULL,
--This is the timezone for the changed_at column - if null, we assume it is timestamptz (we could check that actually)
queue_table_tz TEXT,
row_created_at TIMESTAMPTZ DEFAULT NOW(),
row_updated_at TIMESTAMPTZ,
CONSTRAINT unique_queue_table UNIQUE (queue_table_relid),
CONSTRAINT unique_base_table UNIQUE (queue_of_base_table_relid)
);
COMMENT ON COLUMN fact_loader.queue_tables.pglogical_node_if_id IS $$The reason for this config existing here is that we have no built-in way
in pglogical to know which tables belong to which pglogical node. Therefore, we
need to configure that. We hope that some time down the road, this will change,
and we can derive this information.$$;
SELECT pg_catalog.pg_extension_config_dump('fact_loader.queue_tables', '');
CREATE TABLE fact_loader.queue_table_deps (
queue_table_dep_id SERIAL PRIMARY KEY,
fact_table_id INT NOT NULL REFERENCES fact_loader.fact_tables (fact_table_id),
queue_table_id INT NOT NULL REFERENCES fact_loader.queue_tables (queue_table_id),
relevant_change_columns NAME[],
last_cutoff_id BIGINT,
last_cutoff_source_time TIMESTAMPTZ,
insert_merge_proid REGPROC NOT NULL,
update_merge_proid REGPROC NOT NULL,
delete_merge_proid REGPROC NOT NULL,
row_created_at TIMESTAMPTZ DEFAULT NOW(),
row_updated_at TIMESTAMPTZ,
CONSTRAINT unique_queue_deps UNIQUE (fact_table_id, queue_table_id)
);
SELECT pg_catalog.pg_extension_config_dump('fact_loader.queue_table_deps', '');
CREATE TABLE fact_loader.key_retrieval_sequences (
key_retrieval_sequence_id SERIAL PRIMARY KEY,
queue_table_dep_id INT NOT NULL REFERENCES fact_loader.queue_table_deps (queue_table_dep_id),
/****
In almost all cases, we only need to write one way to retrieve keys. The only exception is, for
example, when in a delete case, you need to pass a different field (customer_id instead of order_id)
to the delete_merge_proid function. You then need a different key_retrieval_sequence to handle
a different field name for this delete case.
By default this is NULL, meaning there is no filter, meaning the sequence applies to all events I, U, D.
Otherwise, you can add scopes in which case you must have one for each of 'I','U','D'.
*/
filter_scope CHAR(1) NULL,
level INT NOT NULL,
return_columns NAME[] NOT NULL,
is_fact_key BOOLEAN NOT NULL,
join_to_relation REGCLASS NULL,
join_to_column NAME NULL,
return_columns_from_join NAME[] NULL,
join_return_is_fact_key BOOLEAN NULL,
CONSTRAINT unique_retrievals UNIQUE (queue_table_dep_id, filter_scope, level),
CONSTRAINT valid_scopes CHECK (filter_scope IN ('I','U','D'))
);
SELECT pg_catalog.pg_extension_config_dump('fact_loader.key_retrieval_sequences', '');
CREATE TABLE fact_loader.fact_table_dep_queue_table_deps
(
fact_table_dep_queue_table_dep_id SERIAL PRIMARY KEY,
fact_table_dep_id INT REFERENCES fact_loader.fact_table_deps (fact_table_dep_id),
queue_table_dep_id INT REFERENCES fact_loader.queue_table_deps (queue_table_dep_id),
last_cutoff_id BIGINT,
last_cutoff_source_time TIMESTAMPTZ,
insert_merge_proid REGPROC NOT NULL,
update_merge_proid REGPROC NOT NULL,
delete_merge_proid REGPROC NOT NULL,
row_created_at TIMESTAMPTZ DEFAULT NOW(),
row_updated_at TIMESTAMPTZ,
CONSTRAINT unique_cutoffs UNIQUE (fact_table_dep_id, queue_table_dep_id)
);
CREATE OR REPLACE FUNCTION fact_loader.unique_scopes()
RETURNS TRIGGER AS
$BODY$
BEGIN
IF (NEW.filter_scope IS NULL AND EXISTS (
SELECT 1
FROM fact_loader.key_retrieval_sequences
WHERE queue_table_dep_id <> NEW.queue_table_dep_id
AND NEW.filter_scope IS NOT NULL
)) OR
(NEW.filter_scope IS NOT NULL AND EXISTS (
SELECT 1
FROM fact_loader.key_retrieval_sequences
WHERE queue_table_dep_id <> NEW.queue_table_dep_id
AND NEW.filter_scope IS NULL
))
THEN
RAISE EXCEPTION $$You must either use a NULL filter_scope to cover all 3 events I, U, D
or you must specify all 3 events separately I, U, D (For queue_table_dep_id %).
$$, NEW.queue_table_dep_id;
END IF;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql;
CREATE TRIGGER unique_scopes
BEFORE INSERT OR UPDATE ON fact_loader.key_retrieval_sequences
FOR EACH ROW EXECUTE PROCEDURE fact_loader.unique_scopes();
/***
This table is unlogged because it only has data mid-transaction and should always be empty
*/
CREATE UNLOGGED TABLE fact_loader.process_queue (
process_queue_id BIGSERIAL PRIMARY KEY,
fact_table_id INT NOT NULL REFERENCES fact_loader.fact_tables (fact_table_id),
proid REGPROC NOT NULL,
key_value TEXT NOT NULL,
row_created_at TIMESTAMPTZ DEFAULT NOW(),
row_updated_at TIMESTAMPTZ
);
CREATE OR REPLACE FUNCTION fact_loader.set_row_updated_at_to_now()
RETURNS TRIGGER AS
$BODY$
BEGIN
NEW.row_updated_at = now();
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql;
CREATE TRIGGER set_row_updated_at_to_now
BEFORE INSERT OR UPDATE ON fact_loader.fact_tables
FOR EACH ROW
WHEN (NEW.row_updated_at IS DISTINCT FROM now())
EXECUTE PROCEDURE fact_loader.set_row_updated_at_to_now();
CREATE TRIGGER set_row_updated_at_to_now
BEFORE INSERT OR UPDATE ON fact_loader.fact_table_deps
FOR EACH ROW
WHEN (NEW.row_updated_at IS DISTINCT FROM now())
EXECUTE PROCEDURE fact_loader.set_row_updated_at_to_now();
CREATE TRIGGER set_row_updated_at_to_now
BEFORE INSERT OR UPDATE ON fact_loader.queue_tables
FOR EACH ROW
WHEN (NEW.row_updated_at IS DISTINCT FROM now())
EXECUTE PROCEDURE fact_loader.set_row_updated_at_to_now();
CREATE TRIGGER set_row_updated_at_to_now
BEFORE INSERT OR UPDATE ON fact_loader.queue_table_deps
FOR EACH ROW
WHEN (NEW.row_updated_at IS DISTINCT FROM now())
EXECUTE PROCEDURE fact_loader.set_row_updated_at_to_now();
CREATE TRIGGER set_row_updated_at_to_now
BEFORE INSERT OR UPDATE ON fact_loader.fact_table_dep_queue_table_deps
FOR EACH ROW
WHEN (NEW.row_updated_at IS DISTINCT FROM now())
EXECUTE PROCEDURE fact_loader.set_row_updated_at_to_now();
CREATE TRIGGER set_row_updated_at_to_now
BEFORE INSERT OR UPDATE ON fact_loader.process_queue
FOR EACH ROW
WHEN (NEW.row_updated_at IS DISTINCT FROM now())
EXECUTE PROCEDURE fact_loader.set_row_updated_at_to_now();
CREATE TYPE fact_loader.table_load_type AS ENUM('delta','full_refresh');
CREATE OR REPLACE FUNCTION fact_loader.create_table_loader_function
(p_source_proc REGPROC,
p_destination_relation REGCLASS,
p_ignore_diff_for_columns TEXT[])
RETURNS REGPROC AS
$BODY$
DECLARE
v_new_proc TEXT;
v_sql TEXT;
BEGIN
/****
Find the primary key for the destination table. This is required.
If the destination table does not have a primary key, it should.
This is partly for simplicity, and partly to encourage good practice
that we build and refresh tables based on chosen primary key to match
records 1 for 1, which is basic DB design 101.
*/
SELECT function_name, function_sql
INTO v_new_proc, v_sql
FROM fact_loader.table_loader_function(p_source_proc, p_destination_relation, p_ignore_diff_for_columns);
EXECUTE v_sql;
RETURN v_new_proc::REGPROC;
END;
$BODY$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION fact_loader.execute_queue(p_fact_table_id INT)
RETURNS TABLE (sql TEXT) AS
$BODY$
BEGIN
RETURN QUERY
WITH ordered_process_queue AS
(SELECT process_queue_id, proid, key_value,
--TODO - either infer the data type of the function args, which is not super easy with postgres,
--or add configuration fields for the name and data type of these. This will suffice for now
--because we only have integer args for all functions
'integer' AS queue_of_base_table_key_type
FROM fact_loader.process_queue pq
WHERE pq.fact_table_id = p_fact_table_id
ORDER BY process_queue_id)
, with_rank AS
(SELECT format('%s(%s::%s)', proid::TEXT, 'key_value', queue_of_base_table_key_type) AS function_call,
process_queue_id,
RANK() OVER (PARTITION BY proid) AS execution_group
FROM ordered_process_queue
)
, execute_sql_groups AS
(
SELECT execution_group,
format($$
WITH newly_processed AS (
SELECT process_queue_id, %s
FROM (
/****
Must wrap this to execute in order of ids
***/
SELECT *
FROM fact_loader.process_queue
WHERE process_queue_id BETWEEN %s AND %s
AND fact_table_id = %s
ORDER BY process_queue_id) q
)
DELETE FROM fact_loader.process_queue pq USING newly_processed np
WHERE np.process_queue_id = pq.process_queue_id;
$$, function_call, MIN(process_queue_id), MAX(process_queue_id), p_fact_table_id) AS execute_sql
FROM with_rank
GROUP BY execution_group, function_call
ORDER BY execution_group
)
SELECT COALESCE(string_agg(execute_sql,''),'SELECT NULL') AS final_execute_sql
FROM execute_sql_groups;
END;
$BODY$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION fact_loader.execute_table_loader
(p_source_relation REGCLASS,
p_destination_relation REGCLASS,
p_ignore_diff_for_columns TEXT[],
p_load_type fact_loader.table_load_type,
p_ignore_unmapped_columns BOOLEAN = FALSE)
RETURNS TABLE (upserted INT, deleted INT, truncated BOOLEAN, pct_dest NUMERIC(8,2)) AS
$BODY$
/***
The SQL executed within this container is the actual
load to the destination table, and assumes that 'prepare'
phase has already been run, which is supposed to have gathered
the actual minimal delta and determine what to do here.
*/
DECLARE
v_sql TEXT;
v_unmapped_src_columns TEXT[];
v_unmapped_dest_columns TEXT[];
BEGIN
SELECT execute_sql, unmapped_src_columns, unmapped_dest_columns INTO v_sql, v_unmapped_src_columns, v_unmapped_dest_columns
FROM fact_loader.table_loader(
p_source_relation,
p_destination_relation,
p_ignore_diff_for_columns,
p_load_type);
PERFORM fact_loader.table_loader_validator(p_source_relation,
p_destination_relation,
v_unmapped_src_columns,
v_unmapped_dest_columns,
p_ignore_unmapped_columns);
RAISE LOG 'Executing SQL: %', v_sql;
EXECUTE v_sql;
RETURN QUERY
SELECT * FROM count_tracker;
END;
$BODY$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION fact_loader.load(p_fact_table_id INT)
RETURNS VOID AS
$BODY$
DECLARE
v_insert_to_process_queue_sql text;
v_execute_sql text;
v_metadata_update_sql text;
v_debug_rec record;
v_debug_text text = '';
BEGIN
/***
There are 3 basic steps to this load:
1. Gather all queue table changes and insert them into a consolidated process_queue
2. Using the process_queue data, execute the delta load of the fact table
3. Update the metadata indicating the last records updated for both the queue tables and fact table
*/
/****
Get SQL to insert new data into the consolidated process_queue,
and SQL to update metadata for last_cutoffs.
*/
SELECT insert_to_process_queue_sql, metadata_update_sql
INTO v_insert_to_process_queue_sql, v_metadata_update_sql
FROM fact_loader.sql_builder(p_fact_table_id);
/****
Populate the consolidated queue
*/
RAISE LOG 'Populating Queue for fact_table_id %: %', p_fact_table_id, v_insert_to_process_queue_sql;
EXECUTE COALESCE(v_insert_to_process_queue_sql, $$SELECT 'No queue data' AS result$$);
/****
For DEBUG purposes only to view the actual process_queue. Requires setting log_min_messages to DEBUG.
*/
IF current_setting('log_min_messages') LIKE 'debug%' THEN
FOR v_debug_rec IN
SELECT * FROM fact_loader.process_queue
LOOP
v_debug_text = v_debug_text||E'\n'||format('%s', v_debug_rec.process_queue_id||chr(9)||v_debug_rec.fact_table_id||chr(9)||v_debug_rec.proid||chr(9)||v_debug_rec.key_value);
END LOOP;
IF v_debug_text <> '' THEN
v_debug_text = E'\n'||format('%s',
(SELECT string_agg(column_name,chr(9))
FROM information_schema.columns
WHERE table_name = 'process_queue'
AND table_schema = 'fact_loader'
AND column_name NOT LIKE 'row_%_at'))
||v_debug_text;
RAISE DEBUG '%', v_debug_text;
END IF;
END IF;
/****
With data now in the process_queue, the execute_queue function builds the SQL to execute.
Save this SQL in a variable and execute it.
If there is no data to execute, this is a no-op select statement.
*/
SELECT sql INTO v_execute_sql FROM fact_loader.execute_queue(p_fact_table_id);
RAISE LOG 'Executing Queue for fact_table_id %: %', p_fact_table_id, v_execute_sql;
EXECUTE COALESCE(v_execute_sql, $$SELECT 'No queue data to execute' AS result$$);
/****
With everything finished, we now update the metadata for the fact_table.
Even if no data was processed, we will still move forward last_refresh_attempted_at.
last_refresh_succeeded will be marked true always for now. It could in the future
be used to indicate a failure in case of a caught error.
*/
RAISE LOG 'Updating metadata for fact_table_id %: %', p_fact_table_id, v_metadata_update_sql;
EXECUTE COALESCE(v_metadata_update_sql,
format(
$$UPDATE fact_loader.fact_tables ft
SET last_refresh_attempted_at = now(),
last_refresh_succeeded = TRUE
WHERE fact_table_id = %s;
$$, p_fact_table_id));
END;
$BODY$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION fact_loader.prepare_table_loader
(p_source_relation REGCLASS,
p_destination_relation REGCLASS,
p_ignore_diff_for_columns TEXT[],
p_load_type fact_loader.table_load_type,
p_ignore_unmapped_columns BOOLEAN = FALSE)
RETURNS TABLE (upserted INT, deleted INT, truncated BOOLEAN, pct_dest NUMERIC(8,2)) AS
$BODY$
/***
The SQL executed within this container is not going
to lock any of the destination table for writing, which
is precisely why it is separated from the 'execute' phase
which actually writes to the table in the shortest transaction
possible.
*/
DECLARE
v_sql TEXT;
v_unmapped_src_columns TEXT[];
v_unmapped_dest_columns TEXT[];
BEGIN
SELECT prepare_sql, unmapped_src_columns, unmapped_dest_columns INTO v_sql, v_unmapped_src_columns, v_unmapped_dest_columns
FROM fact_loader.table_loader(
p_source_relation,
p_destination_relation,
p_ignore_diff_for_columns,
p_load_type);
PERFORM fact_loader.table_loader_validator(p_source_relation,
p_destination_relation,
v_unmapped_src_columns,
v_unmapped_dest_columns,
p_ignore_unmapped_columns);
RAISE LOG 'Executing SQL: %', v_sql;
EXECUTE v_sql;
RETURN QUERY
SELECT * FROM count_tracker;
END;
$BODY$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION fact_loader.purge_queues
(p_add_interval INTERVAL = '1 hour')
RETURNS VOID AS
$BODY$
/*****
The interval overlap is only important for delete cases in which you may need to join
to another audit table in order to get a deleted row's data. 1 hour is somewhat arbitrary,
but in the delete case, any related deleted rows would seem to normally appear very close to
another relation's deleted rows. 1 hour is probably generous but also safe.
*/
DECLARE
v_sql TEXT;
BEGIN
WITH eligible_queue_tables_for_purge AS
(SELECT
/****
This logic should handle dependent fact tables as well,
because they share the same queue tables but they have separately
logged last_cutoffs.
*/
qt.queue_table_relid
, qt.queue_table_id_field
, queue_table_timestamp
, queue_table_tz
, MIN(last_cutoff_id) AS min_cutoff_id
, MIN(last_cutoff_source_time) AS min_source_time
FROM fact_loader.queue_deps_all qt
WHERE qt.last_cutoff_id IS NOT NULL
/***
There must be no other fact tables using the same queue
which have not yet been processed at all
*/
AND NOT EXISTS
(SELECT 1
FROM fact_loader.queue_deps_all qtdx
WHERE qtdx.queue_table_id = qt.queue_table_id
AND qtdx.last_cutoff_id IS NULL)
GROUP BY qt.queue_table_relid
, qt.queue_table_id_field
, queue_table_timestamp
, queue_table_tz)
SELECT
string_agg(
format($$
DELETE FROM %s
WHERE %s <= %s
AND %s %s < (%s::TIMESTAMPTZ - interval %s);
$$,
queue_table_relid,
queue_table_id_field,
min_cutoff_id,
quote_ident(queue_table_timestamp),
CASE WHEN queue_table_tz IS NULL THEN '' ELSE 'AT TIME ZONE '||quote_literal(queue_table_tz) END,
quote_literal(min_source_time),
quote_literal(p_add_interval::TEXT)
)
, E'\n\n')
INTO v_sql
FROM eligible_queue_tables_for_purge;
IF v_sql IS NOT NULL THEN
RAISE LOG 'Purging Queue: %', v_sql;
EXECUTE v_sql;
END IF;
END;
$BODY$
LANGUAGE plpgsql;
CREATE FUNCTION fact_loader.refresh_fact_table_dep_queue_table_deps()
RETURNS VOID AS
$BODY$
BEGIN
/****
This function will be used to refresh the fact_table_dep_queue_table_deps table.
The purpose of this table is to easily figure out queue data for fact tables that depend on other fact tables.
This will be run with every call of load().
This may not be the most efficient method, but it is certainly reliable and fast.
*/
/****
Recursively find all fact table deps including nested ones (fact tables that depend on other fact tables)
to build the fact_table_dep_queue_table_deps table.
*/
WITH RECURSIVE all_fact_table_deps AS (
SELECT
qtd.queue_table_dep_id
, ftd.fact_table_dep_id
, parent_id AS parent_fact_table_id
, child_id AS fact_table_id
, qtd.queue_table_id
, qt.queue_table_relid
, ftp.fact_table_relid AS parent_fact_table
, ftc.fact_table_relid AS child_fact_table
, ftd.default_insert_merge_proid
, ftd.default_update_merge_proid
, ftd.default_delete_merge_proid
FROM fact_loader.queue_table_deps qtd
INNER JOIN fact_loader.queue_tables qt ON qtd.queue_table_id = qt.queue_table_id
INNER JOIN fact_loader.fact_table_deps ftd ON ftd.parent_id = qtd.fact_table_id
INNER JOIN fact_loader.fact_tables ftp USING (fact_table_id)
INNER JOIN fact_loader.fact_tables ftc ON ftc.fact_table_id = ftd.child_id
UNION ALL
SELECT
qtd.queue_table_dep_id
, ftd.fact_table_dep_id
, parent_id AS parent_fact_table_id
, child_id AS fact_table_id
, qtd.queue_table_id
, qt.queue_table_relid
, ftp.fact_table_relid AS parent_fact_table
, ft.fact_table_relid AS child_fact_table
, ftd.default_insert_merge_proid
, ftd.default_update_merge_proid
, ftd.default_delete_merge_proid
FROM all_fact_table_deps qtd
INNER JOIN fact_loader.queue_tables qt ON qtd.queue_table_id = qt.queue_table_id
INNER JOIN fact_loader.fact_table_deps ftd ON ftd.parent_id = qtd.fact_table_id
INNER JOIN fact_loader.fact_tables ftp ON ftp.fact_table_id = ftd.parent_id
INNER JOIN fact_loader.fact_tables ft ON ft.fact_table_id = ftd.child_id
)
/****
Remove fact_table_dep_queue_table_deps that no longer exist if applicable
*/
, removed AS (
DELETE FROM fact_loader.fact_table_dep_queue_table_deps ftdqc
WHERE NOT EXISTS(SELECT 1
FROM all_fact_table_deps aftd
WHERE aftd.fact_table_dep_id = ftdqc.fact_table_dep_id
AND aftd.queue_table_dep_id = ftdqc.queue_table_dep_id)
)
/****
Add any new keys or ignore if they already exist
*/
INSERT INTO fact_loader.fact_table_dep_queue_table_deps
(fact_table_dep_id, queue_table_dep_id, insert_merge_proid, update_merge_proid, delete_merge_proid)
SELECT fact_table_dep_id, queue_table_dep_id, default_insert_merge_proid, default_update_merge_proid, default_delete_merge_proid
FROM all_fact_table_deps
ON CONFLICT (fact_table_dep_id, queue_table_dep_id)
DO NOTHING;
END;
$BODY$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION fact_loader.table_loader
(p_source_relation REGCLASS,
p_destination_relation REGCLASS,
p_ignore_diff_for_columns TEXT[],
p_load_type fact_loader.table_load_type)
RETURNS TABLE (prepare_sql text, execute_sql text, unmapped_src_columns text[], unmapped_dest_columns text[]) AS
$BODY$
DECLARE
v_pkey_fields TEXT[];
BEGIN
/****
Find the primary key for the destination table. This is required.
If the destination table does not have a primary key, it should.
This is partly for simplicity, and partly to encourage good practice
that we build and refresh tables based on chosen primary key to match
records 1 for 1, which is basic DB design 101.
*/
SELECT array_agg(a.attname ORDER BY pk.rn) INTO v_pkey_fields
FROM (SELECT
i.indrelid
, unnest(indkey) AS ik
, row_number()
OVER () AS rn
FROM pg_index i
WHERE i.indrelid = p_destination_relation AND i.indisprimary) pk
INNER JOIN pg_attribute a
ON a.attrelid = pk.indrelid AND a.attnum = pk.ik;
RETURN QUERY
WITH source_columns AS
(
SELECT column_name, ordinal_position, CASE WHEN column_name = ANY(v_pkey_fields) THEN TRUE ELSE FALSE END AS pkey_field
FROM information_schema.columns c
INNER JOIN pg_class pc ON pc.relname = c.table_name AND pc.oid = p_source_relation
INNER JOIN pg_namespace n ON n.oid = pc.relnamespace AND c.table_schema = n.nspname
ORDER BY ordinal_position
)
, destination_columns AS
(
SELECT column_name, ordinal_position, CASE WHEN column_name = ANY(v_pkey_fields) THEN TRUE ELSE FALSE END AS pkey_field
FROM information_schema.columns c
INNER JOIN pg_class pc ON pc.relname = c.table_name AND pc.oid = p_destination_relation
INNER JOIN pg_namespace n ON n.oid = pc.relnamespace AND c.table_schema = n.nspname
ORDER BY ordinal_position
)
, unmapped_source_columns AS
(
SELECT array_agg(s.column_name::text) AS unmapped_columns_src
FROM source_columns s
WHERE NOT EXISTS
(SELECT 1 FROM destination_columns d WHERE d.column_name = s.column_name)
)
, unmapped_dest_columns AS
(
SELECT array_agg(d.column_name::text) AS unmapped_columns_dest
FROM destination_columns d
WHERE NOT EXISTS
(SELECT 1 FROM source_columns s WHERE d.column_name = s.column_name)
)
, pkeys AS
(
SELECT
string_agg(quote_ident(pkey_field),E'\n, ') AS pkey_fields,
string_agg($$s.$$||quote_ident(pkey_field)||$$ = d.$$||quote_ident(pkey_field),E'\nAND ') AS pkey_join
FROM
(SELECT unnest AS pkey_field
FROM unnest(v_pkey_fields)) pk
)
, info AS
(
SELECT
string_agg(
CASE
WHEN sc.column_name IS NOT NULL
THEN dc.column_name
ELSE
NULL
END, E'\n , '
ORDER BY dc.ordinal_position
)
AS matching_column_list
, string_agg(
CASE
WHEN sc.column_name IS NOT NULL
AND (p_ignore_diff_for_columns IS NULL
OR sc.column_name != ALL (p_ignore_diff_for_columns)
)
THEN dc.column_name
ELSE
NULL
END, E'\n , '
ORDER BY dc.ordinal_position
)
AS matching_column_list_without_ignored
, string_agg(
CASE
WHEN sc.column_name IS NOT NULL
AND NOT dc.pkey_field
THEN dc.column_name || ' = EXCLUDED.' || dc.column_name
ELSE
NULL
END, E'\n , '
ORDER BY dc.ordinal_position
)
AS upsert_list
, pkeys.pkey_fields
, pkeys.pkey_join
FROM destination_columns dc
CROSS JOIN pkeys
LEFT JOIN source_columns sc ON dc.column_name = sc.column_name
GROUP BY pkeys.pkey_fields,
pkeys.pkey_join
)
, sql_snippets AS
(
SELECT
$$
DROP TABLE IF EXISTS count_tracker;
CREATE TEMP TABLE count_tracker (upserted INT, deleted INT, truncated BOOLEAN, pct_dest NUMERIC(8,2));
INSERT INTO count_tracker VALUES (NULL, NULL, FALSE, NULL);
$$::TEXT
AS count_tracker_sql
, $$
DROP TABLE IF EXISTS actual_delta;
CREATE TEMP TABLE actual_delta AS
WITH final_diff AS (
SELECT $$||pkey_fields||$$
FROM
(SELECT $$||matching_column_list_without_ignored||$$
FROM $$||p_source_relation::TEXT||$$
EXCEPT
SELECT $$||matching_column_list_without_ignored||$$
FROM $$||p_destination_relation::TEXT||$$ d $$
AS actual_delta_sql
, $$
DROP TABLE IF EXISTS removed_keys;
CREATE TEMP TABLE removed_keys AS
SELECT $$||pkey_fields||$$
FROM $$||p_destination_relation::TEXT||$$ d
WHERE NOT EXISTS (SELECT 1 FROM $$||p_source_relation::TEXT||$$ s WHERE $$||pkey_join||$$);
$$
AS removed_keys_sql
, $$
) full_diff)
--This extra step is necessarily precisely because we may want to not except every column, like load_dttm
SELECT *
FROM $$||p_source_relation::TEXT||$$ s
WHERE EXISTS (
SELECT 1
FROM final_diff d
WHERE $$||pkey_join||$$
);
$$
AS except_join_to_source_sql
, $$
/***
We add the exists here because we are only looking for column-level differences
for the given keys that have changed. This may be a very small portion of the
table. Without the exists clause, this second part of EXCEPT would do a full
table scan unnecessarily.
***/
WHERE EXISTS (SELECT 1 FROM $$||p_source_relation::TEXT||$$ s WHERE $$||pkey_join||$$)$$
AS key_join_exists_sql
, $$
/***
We add a primary key to the actual_delta table to ensure there are no duplicate keys.
***/
ALTER TABLE actual_delta ADD PRIMARY KEY ($$||pkey_fields||$$);
$$
AS add_delta_pkey_sql
, $$
/****
This part is not implemented yet, but partially complete.
If we decide we want to figure out that >50% of the table will be updated, we could decide
to truncate. But then we have to balance the desire for that with more read queries to
figure it out.
To implement, add the type full_refresh_truncate to fact_loader.table_load_type, and uncomment code.
We would also have to add the logic to find actual keys added, then subtract it from actual_delta
to get the net updates expected. If this is over 50%, we should truncate and re-insert all data.
***/
DROP TABLE IF EXISTS percent_of_destination;
CREATE TEMP TABLE percent_of_destination AS
SELECT
(((SELECT COUNT(1) FROM actual_delta) - (SELECT COUNT(1) FROM added_keys))::NUMERIC /
(SELECT COUNT(1) FROM $$||p_destination_relation::TEXT||$$)::NUMERIC)::NUMERIC(8,2) AS pct;
UPDATE count_tracker SET pct_dest = (SELECT pct FROM percent_of_destination);
$$
AS percent_change_sql
,$$
DO $LOCK_SAFE_DDL$
BEGIN
SET lock_timeout TO '10ms';
IF (SELECT pct FROM percent_of_destination) >= 0.5 THEN
LOOP
BEGIN
TRUNCATE $$||p_destination_relation::TEXT||$$;
UPDATE count_tracker SET truncated = true;
EXIT;
EXCEPTION
WHEN lock_not_available
THEN RAISE WARNING 'Could not obtain immediate lock for SQL %, retrying', p_sql;
PERFORM pg_sleep(3);
WHEN OTHERS THEN
RAISE;
END;
END LOOP;
END IF;
RESET lock_timeout;
END
$LOCK_SAFE_DDL$
;
$$
AS lock_safe_truncate_sql
,$$
--Delete keys that are no longer in your new version
DELETE FROM $$||p_destination_relation::TEXT||$$ d
WHERE EXISTS
(SELECT 1 FROM removed_keys s WHERE $$||pkey_join||$$);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
UPDATE count_tracker SET deleted = v_row_count;
$$
AS delete_sql
,$$
INSERT INTO $$||p_destination_relation::TEXT||$$ AS t ($$||
matching_column_list||$$)
SELECT $$||matching_column_list||
$$ FROM actual_delta
ON CONFLICT ($$||pkey_fields||$$)
DO UPDATE
SET $$||upsert_list||$$
;
GET DIAGNOSTICS v_row_count = ROW_COUNT;
UPDATE count_tracker SET upserted = v_row_count;
$$
AS upsert_sql
FROM info
)
SELECT
count_tracker_sql||
CASE
/*** not implemented truncate pattern
WHEN p_load_type IN('full_refresh','full_refresh_truncate') THEN
***/
WHEN p_load_type = 'full_refresh' THEN
removed_keys_sql||actual_delta_sql||except_join_to_source_sql||add_delta_pkey_sql||$$;$$
WHEN p_load_type = 'delta' THEN
actual_delta_sql||key_join_exists_sql||except_join_to_source_sql||add_delta_pkey_sql||$$;$$
END||$$
$$||
/*** not implemented truncate pattern
CASE
WHEN p_load_type = 'full_refresh_truncate' THEN
percent_change_sql
ELSE
''
END
***/
''
AS prepare_sql
, $$
--THIS SHOULD BE RUN IN A TRANSACTION
DO $SCRIPT$
DECLARE
v_row_count INT;
v_results RECORD;
BEGIN
$$||
CASE
/*** not implemented truncate pattern
WHEN p_load_type = 'full_refresh_truncate' THEN
lock_safe_truncate_sql||delete_sql||upsert_sql
***/
WHEN p_load_type = 'full_refresh' THEN
delete_sql||upsert_sql
WHEN p_load_type = 'delta' THEN
upsert_sql
END||$$
FOR v_results IN SELECT * FROM count_tracker LOOP
RAISE LOG 'upserted: %, deleted: %, truncated: %, pct_dest: %',
v_results.upserted, v_results.deleted, v_results.truncated, v_results.pct_dest;
END LOOP;
END
$SCRIPT$;
$$ AS execute_sql
, (SELECT unmapped_columns_src FROM unmapped_source_columns) AS unmapped_src_columns
, (SELECT unmapped_columns_dest FROM unmapped_dest_columns) AS unmapped_dest_columns
FROM sql_snippets;
END;
$BODY$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION fact_loader.table_loader_function
(p_source_proc REGPROC,
p_destination_relation REGCLASS,
p_ignore_diff_for_columns TEXT[])
RETURNS TABLE (function_name text, function_sql text) AS
$BODY$
BEGIN
/****
Find the primary key for the destination table. This is required.
If the destination table does not have a primary key, it should.
This is partly for simplicity, and partly to encourage good practice
that we build and refresh tables based on chosen primary key to match
records 1 for 1, which is basic DB design 101.
*/
RETURN QUERY
WITH get_pkey_fields AS (
SELECT
a.attname,
format_type(a.atttypid, a.atttypmod) AS atttype,
pk.rn
FROM (SELECT
i.indrelid
, unnest(indkey) AS ik
, row_number()
OVER () AS rn
FROM pg_index i
WHERE i.indrelid = p_destination_relation AND i.indisprimary) pk
INNER JOIN pg_attribute a
ON a.attrelid = pk.indrelid AND a.attnum = pk.ik)
, pkey_fields_sorted AS
(SELECT array_agg(attname ORDER BY rn) AS pkey_fields FROM get_pkey_fields)
, function_args AS
(SELECT regexp_matches(pg_get_function_identity_arguments(p_source_proc),'(?:^|, )(\w+)','g') AS arg)
, function_schema AS
(SELECT string_agg(arg[1],', ') AS arg_params,
pg_get_function_identity_arguments(p_source_proc) AS arg_defs
FROM function_args)
, destination_columns AS
(
SELECT c.table_schema, c.table_name, column_name, ordinal_position, CASE WHEN gpf.attname IS NOT NULL THEN TRUE ELSE FALSE END AS pkey_field