diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestBatchCopyFrom.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestBatchCopyFrom.java index ae986601f7c9..c0092365ab60 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestBatchCopyFrom.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestBatchCopyFrom.java @@ -248,7 +248,8 @@ public void testStatementLevelTriggersWithBatchCopyFailure() throws Exception { "COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %s)", tableName, absFilePath, batchSize), INVALID_COPY_INPUT_ERROR_MSG); - assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalValidLines); + // The copy will happen in one batch, hence none of the rows will be copied + assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, 0); assertOneRow(statement, "SELECT COUNT(*) FROM " + dummyTableName, 0); // test before-statement trigger @@ -262,9 +263,9 @@ public void testStatementLevelTriggersWithBatchCopyFailure() throws Exception { "COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %s)", tableName, absFilePath, batchSize), INVALID_COPY_INPUT_ERROR_MSG); - assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalValidLines * 2); - // note, before statement-level trigger will execute even if COPY FROM fails - assertOneRow(statement, "SELECT COUNT(*) FROM " + dummyTableName, 1); + // The copy will happen in one batch, hence none of the rows will be copied + assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, 0); + assertOneRow(statement, "SELECT COUNT(*) FROM " + dummyTableName, 0); } } @@ -583,4 +584,154 @@ public void testBatchedCopyForPartitionedTables() throws Exception { } } } + + @Test + public void testBatchedCopyValidForeignKeyCheck() throws Exception { + String absFilePath = getAbsFilePath("fk-copyfrom.txt"); + String refTableName = "reftable_ok"; + String tableName = "maintable_ok"; + + int totalLines = 100; + int batchSize = totalLines; + + createFileInTmpDir(absFilePath, totalLines); + + try (Statement statement = connection.createStatement()) { + // Both reference table and main table have the same key set from 0 to totalLines - 1. + statement.execute(String.format("CREATE TABLE %s (a INT PRIMARY KEY)", refTableName)); + statement.execute(String.format( + "INSERT INTO %s (a) SELECT s * 4 FROM GENERATE_SERIES (0, %d) AS s", + refTableName, totalLines - 1)); + + statement.execute( + String.format("CREATE TABLE %s (a INT REFERENCES %s, b INT, c INT, d INT)", + tableName, refTableName)); + statement.execute( + String.format("COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %d)", + tableName, absFilePath, batchSize)); + + assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalLines); + } + } + + @Test + public void testBatchedCopyFailedOnForeignKeyCheck() throws Exception { + String absFilePath = getAbsFilePath("fk-copyfrom-all-failure.txt"); + String refTableName = "reftable_failed"; + String tableName = "maintable_failed"; + + int totalLines = 100; + int batchSize = totalLines; + String referenceKey = "a_fkey"; + + createFileInTmpDir(absFilePath, totalLines); + + String INVALID_FOREIGN_KEY_CHECK_ERROR_MSG = + String.format("insert or update on table \"%s\" violates foreign key constraint \"%s_%s\"", + tableName, tableName, referenceKey); + + try (Statement statement = connection.createStatement()) { + statement.execute(String.format("CREATE TABLE %s (a INT PRIMARY KEY)", refTableName)); + // Create reference table without the (a = 0) line. + statement.execute(String.format( + "INSERT INTO %s (a) SELECT s * 4 FROM GENERATE_SERIES (1, %d) AS s", + refTableName, totalLines - 1)); + + statement.execute( + String.format("CREATE TABLE %s (a INT REFERENCES %s, b INT, c INT, d INT)", + tableName, refTableName)); + + // The execution will fail since the (a = 0) key is not present in the reference table. + runInvalidQuery(statement, + String.format("COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %d)", + tableName, absFilePath, batchSize), + INVALID_FOREIGN_KEY_CHECK_ERROR_MSG); + + // No rows should be copied. + assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, 0); + } + } + + @Test + public void testBatchedCopyPartialFailureOnForeignKeyCheck() throws Exception { + String absFilePath = getAbsFilePath("fk-copyfrom-partial-failure.txt"); + String refTableName = "reftable_partial"; + String tableName = "maintable_partial"; + + int totalLines = 100; + int batchSize = 1; + String referenceKey = "a_fkey"; + + createFileInTmpDir(absFilePath, totalLines); + + String INVALID_FOREIGN_KEY_CHECK_ERROR_MSG = + String.format("insert or update on table \"%s\" violates foreign key constraint \"%s_%s\"", + tableName, tableName, referenceKey); + + try (Statement statement = connection.createStatement()) { + // Create reference table with half of the lines. + statement.execute(String.format("CREATE TABLE %s (a INT PRIMARY KEY)", refTableName)); + statement.execute(String.format( + "INSERT INTO %s (a) SELECT s * 4 FROM GENERATE_SERIES (0, %d) AS s", + refTableName, totalLines/2 - 1)); + + statement.execute( + String.format("CREATE TABLE %s (a INT REFERENCES %s, b INT, c INT, d INT)", + tableName, refTableName)); + + // The execution will throw error since the later half is not present in the reference table. + runInvalidQuery(statement, + String.format("COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %d)", + tableName, absFilePath, batchSize), + INVALID_FOREIGN_KEY_CHECK_ERROR_MSG); + + // However, we should be able to copy up to totalLines / 2 lines + // that are present in the reference table. + assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalLines / 2); + } + } + + @Test + public void testBatchedCopyManualTrigger() throws Exception { + String absFilePath = getAbsFilePath("manual-trigger.txt"); + String tableName = "manual_trigger_table"; + + int totalLines = 100; + + // The batch size will be ignored, since there is a manually created trigger. + int batchSize = 5; + String INVALID_PRIMARY_KEY_TRIGGER_ERROR_MSG = "Primary key too large"; + + createFileInTmpDir(absFilePath, totalLines); + + try (Statement statement = connection.createStatement()) { + statement.execute( + String.format("CREATE TABLE %s (a INT PRIMARY KEY, b INT, c INT, d INT)", tableName)); + + // This trigger will fire since the row will eventually exceed the limit. + statement.execute( + String.format( + "CREATE FUNCTION log_a() RETURNS TRIGGER AS $$ " + + "BEGIN " + + "IF (NEW.a > %d) THEN RAISE EXCEPTION '%s'; " + + "END IF; " + + "RETURN NEW; " + + "END; " + + "$$ LANGUAGE plpgsql;", totalLines / 2, INVALID_PRIMARY_KEY_TRIGGER_ERROR_MSG)); + + statement.execute( + String.format( + "CREATE TRIGGER mytrigger BEFORE INSERT OR UPDATE ON %s " + + "FOR EACH ROW EXECUTE PROCEDURE log_a()", tableName)); + + // The execution will throw error on the primary key trigger. + runInvalidQuery(statement, + String.format("COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %d)", + tableName, absFilePath, batchSize), + INVALID_PRIMARY_KEY_TRIGGER_ERROR_MSG); + + // We should roll back all the changes. + assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, 0); + } + } } diff --git a/src/postgres/src/backend/commands/copy.c b/src/postgres/src/backend/commands/copy.c index ed6c98b488c3..33b962e201f1 100644 --- a/src/postgres/src/backend/commands/copy.c +++ b/src/postgres/src/backend/commands/copy.c @@ -2394,7 +2394,6 @@ CopyFrom(CopyState cstate) int nBufferedTuples = 0; int prev_leaf_part_index = -1; bool useNonTxnInsert; - bool isBatchTxnCopy; /* * If the batch size is not explicitly set in the query by the user, @@ -2404,7 +2403,6 @@ CopyFrom(CopyState cstate) { cstate->batch_size = yb_default_copy_from_rows_per_transaction; } - isBatchTxnCopy = cstate->batch_size > 0; #define MAX_BUFFERED_TUPLES 1000 HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */ @@ -2636,7 +2634,7 @@ CopyFrom(CopyState cstate) ExecSetupChildParentMapForLeaf(proute); } - if (isBatchTxnCopy) + if (cstate->batch_size > 0) { /* * Batched copy is not supported @@ -2665,11 +2663,16 @@ CopyFrom(CopyState cstate) errhint("Either run this COPY outside of a transaction block or set " "rows_per_transaction option to `0` to disable batching and " "remove this warning."))); - - else - { + else if (HasNonRITrigger(cstate->rel->trigdesc)) + ereport(WARNING, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Batched COPY is not supported on table with non RI trigger. " + "Defaulting to using one transaction for the entire copy."), + errhint("Set rows_per_transaction option to `0` to disable batching " + "and remove this warning."))); + else batch_size = cstate->batch_size; - } + cstate->batch_size = batch_size; } @@ -2750,7 +2753,7 @@ CopyFrom(CopyState cstate) bool has_more_tuples = true; while (has_more_tuples) - { + { /* * When batch size is not provided from the query option, * default behavior is to read each line from the file @@ -3073,15 +3076,25 @@ CopyFrom(CopyState cstate) if (IsYBRelation(cstate->rel)) ResetPerTupleExprContext(estate); } - /* - * Commit transaction per batch. - * When CopyFrom method is called, we are already inside a transaction block - * and relevant transaction state properties have been previously set. - */ - if (isBatchTxnCopy) + + if (cstate->batch_size > 0) { + /* + * Handle queued AFTER triggers before committing. If there are errors, + * do not commit the current batch. + */ + AfterTriggerEndQuery(estate); + + /* + * Commit transaction per batch. + * When CopyFrom method is called, we are already inside a transaction block + * and relevant transaction state properties have been previously set. + */ YBCCommitTransaction(); YBInitializeTransaction(); + + /* Start a new AFTER trigger */ + AfterTriggerBeginQuery(); } } diff --git a/src/postgres/src/backend/utils/adt/ri_triggers.c b/src/postgres/src/backend/utils/adt/ri_triggers.c index 3cb0f5d1140d..cf24859e9d87 100644 --- a/src/postgres/src/backend/utils/adt/ri_triggers.c +++ b/src/postgres/src/backend/utils/adt/ri_triggers.c @@ -3315,3 +3315,17 @@ YbAddTriggerFKReferenceIntent(Trigger *trigger, Relation fk_rel, HeapTuple new_r pfree(descr); } } + +/* + * Check if a trigger description contains any non RI trigger. + */ +bool +HasNonRITrigger(const TriggerDesc* trigDesc) +{ + for (int i = trigDesc ? trigDesc->numtriggers : 0; i > 0; i--) + { + if (RI_FKey_trigger_type(trigDesc->triggers[i - 1].tgfoid) == RI_TRIGGER_NONE) + return true; + } + return false; +} diff --git a/src/postgres/src/include/commands/trigger.h b/src/postgres/src/include/commands/trigger.h index 061fe9e7a8c0..6aae92757376 100644 --- a/src/postgres/src/include/commands/trigger.h +++ b/src/postgres/src/include/commands/trigger.h @@ -272,4 +272,7 @@ extern void YbAddTriggerFKReferenceIntent(Trigger *trigger, Relation fk_rel, Hea extern int RI_FKey_trigger_type(Oid tgfoid); +/* Return true if the trigger description has non FK trigger. */ +extern bool HasNonRITrigger(const TriggerDesc* trigDesc); + #endif /* TRIGGER_H */ diff --git a/src/postgres/src/test/regress/expected/yb_pg_triggers.out b/src/postgres/src/test/regress/expected/yb_pg_triggers.out index 5ca8a1041a53..bb388d4b5258 100644 --- a/src/postgres/src/test/regress/expected/yb_pg_triggers.out +++ b/src/postgres/src/test/regress/expected/yb_pg_triggers.out @@ -2874,6 +2874,26 @@ drop table self_ref; drop function dump_insert(); drop function dump_update(); drop function dump_delete(); +-- +-- Rows per transaction should be disabled if table contains non Referential Integrity triggers. +-- +CREATE TABLE tbl(k INT PRIMARY KEY); +CREATE TABLE shadow_tbl(k INT PRIMARY KEY); +CREATE OR REPLACE FUNCTION trigger_func() RETURNS trigger AS $$ +BEGIN + INSERT INTO shadow_tbl VALUES(NEW.k); + RETURN NEW; +END; $$ +LANGUAGE 'plpgsql'; +CREATE TRIGGER tbl_insert_trigger AFTER INSERT + ON tbl FOR EACH ROW EXECUTE PROCEDURE trigger_func(); +-- A warning should be shown disabling the ROWS_PER_TRANSACTION value. +COPY tbl FROM STDIN WITH (ROWS_PER_TRANSACTION 2); +WARNING: Batched COPY is not supported on table with non RI trigger. Defaulting to using one transaction for the entire copy. +HINT: Set rows_per_transaction option to `0` to disable batching and remove this warning. +DROP TRIGGER tbl_insert_trigger ON tbl; +DROP TABLE shadow_tbl; +DROP TABLE tbl; -- Leave around some objects for other tests create table trigger_parted (a int primary key) partition by list (a); create function trigger_parted_trigfunc() returns trigger language plpgsql as diff --git a/src/postgres/src/test/regress/sql/yb_pg_triggers.sql b/src/postgres/src/test/regress/sql/yb_pg_triggers.sql index b2cacb068d96..1a9e3c33cf9c 100644 --- a/src/postgres/src/test/regress/sql/yb_pg_triggers.sql +++ b/src/postgres/src/test/regress/sql/yb_pg_triggers.sql @@ -2176,6 +2176,36 @@ drop function dump_insert(); drop function dump_update(); drop function dump_delete(); +-- +-- Rows per transaction should be disabled if table contains non Referential Integrity triggers. +-- +CREATE TABLE tbl(k INT PRIMARY KEY); +CREATE TABLE shadow_tbl(k INT PRIMARY KEY); + +CREATE OR REPLACE FUNCTION trigger_func() RETURNS trigger AS $$ +BEGIN + INSERT INTO shadow_tbl VALUES(NEW.k); + RETURN NEW; +END; $$ +LANGUAGE 'plpgsql'; + +CREATE TRIGGER tbl_insert_trigger AFTER INSERT + ON tbl FOR EACH ROW EXECUTE PROCEDURE trigger_func(); + +-- A warning should be shown disabling the ROWS_PER_TRANSACTION value. +COPY tbl FROM STDIN WITH (ROWS_PER_TRANSACTION 2); +1 +2 +3 +4 +5 +6 +\. + +DROP TRIGGER tbl_insert_trigger ON tbl; +DROP TABLE shadow_tbl; +DROP TABLE tbl; + -- Leave around some objects for other tests create table trigger_parted (a int primary key) partition by list (a); create function trigger_parted_trigfunc() returns trigger language plpgsql as @@ -2190,4 +2220,3 @@ create table trigger_parted_p2 partition of trigger_parted for values in (2) create table trigger_parted_p2_2 partition of trigger_parted_p2 for values in (2); alter table only trigger_parted_p2 disable trigger aft_row; alter table trigger_parted_p2_2 enable always trigger aft_row; -