diff --git a/full/src/main/java/apoc/cypher/CypherExtended.java b/full/src/main/java/apoc/cypher/CypherExtended.java index e0df83f02b..c9e748102f 100644 --- a/full/src/main/java/apoc/cypher/CypherExtended.java +++ b/full/src/main/java/apoc/cypher/CypherExtended.java @@ -49,6 +49,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -207,8 +208,7 @@ private Stream runManyStatements( runSchemaStatementsInTx( scanner, internalQueue, params, addStatistics, timeout, reportError, fileName); } else { - runDataStatementsInTx( - scanner, internalQueue, params, addStatistics, timeout, reportError, fileName); + runDataStatementsInTx(scanner, internalQueue, params, addStatistics, reportError, fileName); } }, RowResult.TOMBSTONE); @@ -246,40 +246,47 @@ private void runDataStatementsInTx( BlockingQueue queue, Map params, boolean addStatistics, - long timeout, boolean reportError, String fileName) { while (scanner.hasNext()) { String stmt = removeShellControlCommands(scanner.next()); if (stmt.trim().isEmpty()) continue; - boolean schemaOperation; - try { - schemaOperation = isSchemaOperation(stmt); - } catch (Exception e) { - collectError(queue, reportError, e, fileName); - return; - } - if (!schemaOperation) { - if (isPeriodicOperation(stmt)) { - Util.inThread(pools, () -> { - try { - return db.executeTransactionally( - stmt, params, result -> consumeResult(result, queue, addStatistics, tx, fileName)); - } catch (Exception e) { - collectError(queue, reportError, e, fileName); - return null; - } - }); - } else { + // Periodic operations cannot be schema operations, so no need to check that here (will fail as invalid + // query) + if (isPeriodicOperation(stmt)) { + Util.inThread(pools, () -> { + try { + return db.executeTransactionally( + stmt, params, result -> consumeResult(result, queue, addStatistics, tx, fileName)); + } catch (Exception e) { + collectError(queue, reportError, e, fileName); + return null; + } + }); + } else { + AtomicBoolean isSchemaError = new AtomicBoolean(false); + try { Util.inTx(db, pools, threadTx -> { try (Result result = threadTx.execute(stmt, params)) { return consumeResult(result, queue, addStatistics, tx, fileName); } catch (Exception e) { - collectError(queue, reportError, e, fileName); + // APOC historically skips schema operations + if (!(e.getMessage().contains("Schema operations on database") + && e.getMessage().contains("are not allowed"))) { + collectError(queue, reportError, e, fileName); + return null; + } + isSchemaError.set(true); return null; } }); + } catch (Exception e) { + // An error thrown by a schema operation + if (isSchemaError.get()) { + continue; + } + throw e; } } } diff --git a/full/src/test/resources/wrong_statements.cypher b/full/src/test/resources/wrong_statements.cypher index 7cb0b92dd8..3fd11595f2 100644 --- a/full/src/test/resources/wrong_statements.cypher +++ b/full/src/test/resources/wrong_statements.cypher @@ -1 +1,3 @@ -CREATE (n:Person{id:1); \ No newline at end of file +CREATE INDEX ON :Node(id); +CREATE (n:Person{id:1); +CREATE (n:Person{id:1}); \ No newline at end of file diff --git a/full/src/test/resources/wrong_statements_runtime.cypher b/full/src/test/resources/wrong_statements_runtime.cypher index 7a8b3ddfd4..3b3bc36126 100644 --- a/full/src/test/resources/wrong_statements_runtime.cypher +++ b/full/src/test/resources/wrong_statements_runtime.cypher @@ -1,2 +1,2 @@ CREATE (n:Fail {foo: 1}); - +CREATE (n:Fail {foo: 2}); \ No newline at end of file