Skip to content

Commit

Permalink
Fixes #3998: Solve CypherExtendedTest and CypherEnterpriseExtendedTest (
Browse files Browse the repository at this point in the history
#4050)

* Fixes #3998: Solve CypherExtendedTest and CypherEnterpriseExtendedTest

* improved error and tx termination handling
  • Loading branch information
vga91 authored May 2, 2024
1 parent 70dc75b commit ce01da6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

@Ignore
public class CypherEnterpriseExtendedTest {
private static final String CREATE_RETURNQUERY_NODES = "UNWIND range(0,3) as id \n" +
"CREATE (n:ReturnQuery {id:id})-[:REL {idRel: id}]->(:Other {idOther: id})";
Expand Down
52 changes: 39 additions & 13 deletions extended/src/main/java/apoc/cypher/CypherExtended.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -262,25 +263,43 @@ private static boolean isCommentOrEmpty(String stmt) {

private final static Pattern shellControl = Pattern.compile("^:?\\b(begin|commit|rollback)\\b", Pattern.CASE_INSENSITIVE);

private Object consumeResult(Result result, BlockingQueue<RowResult> queue, boolean addStatistics, Transaction tx, String fileName) {
private Object consumeResult(Result result, BlockingQueue<RowResult> queue, boolean addStatistics, Transaction transaction, String fileName) {
try {
long time = System.currentTimeMillis();
int row = 0;
while (result.hasNext()) {
AtomicBoolean closed = new AtomicBoolean(false);
while (isOpenAndHasNext(result, closed)) {
terminationGuard.check();
Map<String, Object> res = EntityUtil.anyRebind(tx, result.next());
Map<String, Object> res = EntityUtil.anyRebind(transaction, result.next());
queue.put(new RowResult(row++, res, fileName));
}
if (addStatistics) {
Map<String, Object> mapResult = toMap(result.getQueryStatistics(), System.currentTimeMillis() - time, row);
queue.put(new RowResult(-1, mapResult, fileName));
}
if (closed.get()) {
queue.put(RowResult.TOMBSTONE);
return null;
}
return row;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

/**
* If the transaction is closed, result.hasNext() will throw an error.
* In that case, we set closed = true, to put a RowResult.TOMBSTONE and terminate the iteration
*/
private static boolean isOpenAndHasNext(Result result, AtomicBoolean closed) {
try {
return result.hasNext();
} catch (Exception e) {
closed.set(true);
return false;
}
}

private String removeShellControlCommands(String stmt) {
Matcher matcher = shellControl.matcher(stmt.trim());
if (matcher.find()) {
Expand Down Expand Up @@ -389,6 +408,7 @@ public Stream<MapResult> mapParallel(@Name("fragment") String fragment, @Name("p
.flatMap((partition) -> Iterators.asList(tx.execute(statement, parallelParams(params, "_", partition))).stream())
.map(MapResult::new);
}

@Procedure
@Description("apoc.cypher.mapParallel2(fragment, params, list-to-parallelize) yield value - executes fragment in parallel batches with the list segments being assigned to _")
public Stream<MapResult> mapParallel2(@Name("fragment") String fragment, @Name("params") Map<String, Object> params, @Name("list") List<Object> data, @Name("partitions") long partitions,@Name(value = "timeout",defaultValue = "10") long timeout) {
Expand All @@ -397,25 +417,31 @@ public Stream<MapResult> mapParallel2(@Name("fragment") String fragment, @Name("
int queueCapacity = 100000;
BlockingQueue<RowResult> queue = new ArrayBlockingQueue<>(queueCapacity);
ArrayBlockingQueue<Transaction> transactions = new ArrayBlockingQueue<>(queueCapacity);
ArrayBlockingQueue<Result> results = new ArrayBlockingQueue<>(queueCapacity);
Stream<List<Object>> parallelPartitions = Util.partitionSubList(data, (int)(partitions <= 0 ? PARTITIONS : partitions), null);
Util.inFuture(pools, () -> {
long total = parallelPartitions
.map((List<Object> partition) -> {
Transaction transaction = db.beginTx();
transactions.add(transaction);
try (Result result = transaction.execute(statement, parallelParams(params, "_", partition))) {
return consumeResult(result, queue, false, transaction, null);
} catch (Exception e) {
throw new RuntimeException(e);
}}
.map((List<Object> partition) -> {
Transaction transaction = db.beginTx();
transactions.add(transaction);
Result result = transaction.execute(statement, parallelParams(params, "_", partition));
results.add(result);
try {
return consumeResult(result, queue, false, transaction, null);
} catch (Exception e) {
throw new RuntimeException(e);
}}
).count();
queue.put(RowResult.TOMBSTONE);
return total;
});

return StreamSupport.stream(new QueueBasedSpliterator<>(queue, RowResult.TOMBSTONE, terminationGuard, (int)timeout),true)
.map(rowResult -> new MapResult(rowResult.result))
.onClose(() -> transactions.forEach(Transaction::close));
.onClose(() -> {
transactions.forEach(i -> Util.close(i));
results.forEach(i -> Util.close(i));
});
}

public Map<String, Object> parallelParams(@Name("params") Map<String, Object> params, String key, List<Object> partition) {
Expand Down
1 change: 0 additions & 1 deletion extended/src/test/java/apoc/cypher/CypherExtendedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
* @since 08.05.16
*/

@Ignore
public class CypherExtendedTest {
public static final String IMPORT_DIR = "src/test/resources";
@ClassRule
Expand Down

0 comments on commit ce01da6

Please sign in to comment.