Skip to content

Commit

Permalink
[madSHaLz] APOC triggers aren't updated after a user deletes a database
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Mar 14, 2023
1 parent fff90a4 commit a23e84c
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 4 deletions.
5 changes: 4 additions & 1 deletion common/src/main/java/apoc/ApocConfigExtensionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.neo4j.kernel.extension.ExtensionType;
import org.neo4j.kernel.extension.context.ExtensionContext;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.DatabaseEventListeners;
import org.neo4j.logging.internal.LogService;

/**
Expand All @@ -22,6 +23,7 @@ public interface Dependencies {
Config config();
GlobalProcedures globalProceduresRegistry();
DatabaseManagementService databaseManagementService();
DatabaseEventListeners databaseEventListeners();
}

public ApocConfigExtensionFactory() {
Expand All @@ -30,7 +32,8 @@ public ApocConfigExtensionFactory() {

@Override
public Lifecycle newInstance(ExtensionContext context, Dependencies dependencies) {
return new ApocConfig(dependencies.config(), dependencies.log(), dependencies.globalProceduresRegistry(), dependencies.databaseManagementService());
return new ApocConfig(dependencies.config(), dependencies.log(), dependencies.globalProceduresRegistry(), dependencies.databaseManagementService(),
dependencies.databaseEventListeners());
}

}
2 changes: 2 additions & 0 deletions common/src/main/java/apoc/ApocExtensionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.DatabaseEventListeners;
import org.neo4j.logging.Log;
import org.neo4j.logging.internal.LogService;
import org.neo4j.scheduler.JobScheduler;
Expand Down Expand Up @@ -43,6 +44,7 @@ public interface Dependencies {
AvailabilityGuard availabilityGuard();
DatabaseManagementService databaseManagementService();
ApocConfig apocConfig();
DatabaseEventListeners databaseEventListeners();
@SuppressWarnings("unused") // used from extended
GlobalProcedures globalProceduresRegistry();
RegisterComponentFactory.RegisterComponentLifecycle registerComponentLifecycle();
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/apoc/CoreApocGlobalComponents.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public Collection<Class> getContextClasses() {

@Override
public Iterable<AvailabilityListener> getListeners(GraphDatabaseAPI db, ApocExtensionFactory.Dependencies dependencies) {
return Collections.singleton(new CypherInitializer(db, dependencies.log().getUserLog(CypherInitializer.class)));
return Collections.singleton(new CypherInitializer(db,
dependencies.log().getUserLog(CypherInitializer.class),
dependencies.databaseManagementService(),
dependencies.databaseEventListeners())
);
}
}
77 changes: 76 additions & 1 deletion core/src/main/java/apoc/cypher/CypherInitializer.java
Original file line number Diff line number Diff line change
@@ -1,39 +1,57 @@
package apoc.cypher;

import apoc.ApocConfig;
import apoc.SystemLabels;
import apoc.util.Util;
import apoc.util.collection.Iterators;
import apoc.version.Version;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.common.DependencyResolver;
import org.neo4j.configuration.GraphDatabaseSettings;
import org.neo4j.dbms.api.DatabaseManagementService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.event.DatabaseEventContext;
import org.neo4j.graphdb.event.DatabaseEventListener;
import org.neo4j.kernel.api.procedure.GlobalProcedures;
import org.neo4j.kernel.availability.AvailabilityListener;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.monitoring.DatabaseEventListeners;
import org.neo4j.logging.Log;

import java.util.Collection;

import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.BiConsumer;

import static apoc.SystemPropertyKeys.database;

public class CypherInitializer implements AvailabilityListener {

private final GraphDatabaseAPI db;
private final Log userLog;
private final GlobalProcedures procs;
private final DependencyResolver dependencyResolver;
private final DatabaseManagementService databaseManagementService;
private final DatabaseEventListeners databaseEventListeners;

/**
* indicates the status of the initializer, to be used for tests to ensure initializer operations are already done
*/
private volatile boolean finished = false;

public CypherInitializer(GraphDatabaseAPI db, Log userLog) {
public CypherInitializer(GraphDatabaseAPI db, Log userLog,
DatabaseManagementService databaseManagementService,
DatabaseEventListeners databaseEventListeners) {
this.db = db;
this.userLog = userLog;
this.databaseManagementService = databaseManagementService;
this.databaseEventListeners = databaseEventListeners;
this.dependencyResolver = db.getDependencyResolver();
this.procs = dependencyResolver.resolveDependency(GlobalProcedures.class);
}
Expand Down Expand Up @@ -70,6 +88,15 @@ public void available() {
} catch (Exception ignored) {
userLog.info("Cannot check APOC version compatibility because of a transient error. Retrying your request at a later time may succeed");
}

// this block is already in system db, so we can directly execute system operation
// for backward compatibility: clear system nodes at start-up if the corresponding database name doesn't exist
// placed here instead of e.g. `ApocConfig` because at this point we are sure that all databases are recovered
cleanUpSystemNodes();

// create listener for each database
databaseEventListeners.registerDatabaseEventListener(new SystemFunctionalityListener());

}
Configuration config = dependencyResolver.resolveDependency(ApocConfig.class).getConfig();

Expand Down Expand Up @@ -147,4 +174,52 @@ private boolean areProceduresRegistered(String procStart) {
public void unavailable() {
// intentionally empty
}

private void cleanUpSystemNodes() {

List<String> databases = databaseManagementService.listDatabases();

forEachSystemLabel((tx, label) -> {
tx.findNodes(label).forEachRemaining(node -> {
boolean dbNonExistent = node.hasProperty(database.name())
&& !databases.contains( (String) node.getProperty(database.name()) );
if (dbNonExistent) {
node.delete();
}
});
});
}

private class SystemFunctionalityListener implements DatabaseEventListener {

@Override
public void databaseDrop(DatabaseEventContext eventContext) {

forEachSystemLabel((tx, label) -> {
tx.findNodes(label, database.name(), eventContext.getDatabaseName())
.forEachRemaining(Node::delete);
});
}

@Override
public void databaseStart(DatabaseEventContext eventContext) {}

@Override
public void databaseShutdown(DatabaseEventContext eventContext) {}

@Override
public void databasePanic(DatabaseEventContext eventContext) {}

@Override
public void databaseCreate(DatabaseEventContext eventContext) {}
}

private void forEachSystemLabel(BiConsumer<Transaction, Label> consumer) {
try (Transaction tx = db.beginTx()) {
for (Label label: SystemLabels.values()) {
consumer.accept(tx, label);
}
tx.commit();
}
}
}
55 changes: 55 additions & 0 deletions core/src/test/java/apoc/trigger/TriggerRestartTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package apoc.trigger;

import apoc.ApocConfig;
import apoc.SystemLabels;
import apoc.util.TestUtil;
import apoc.util.Util;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -10,19 +13,28 @@
import org.neo4j.configuration.GraphDatabaseSettings;
import org.neo4j.dbms.api.DatabaseManagementService;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.test.TestDatabaseManagementServiceBuilder;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Consumer;

import static apoc.ApocConfig.SUN_JAVA_COMMAND;
import static apoc.SystemLabels.ApocTrigger;
import static apoc.SystemPropertyKeys.database;
import static apoc.SystemPropertyKeys.name;
import static apoc.trigger.TriggerTestUtil.TRIGGER_DEFAULT_REFRESH;
import static apoc.trigger.TriggerTestUtil.awaitTriggerDiscovered;
import static apoc.util.TestUtil.waitDbsAvailable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class TriggerRestartTest {

Expand Down Expand Up @@ -62,6 +74,49 @@ private void restartDb() {
waitDbsAvailable(db, sysDb);
}

@Test
public void deleteSystemNodesAtStartupIfDatabaseDoesNotExist() {
String dbName = "notexistent";

// create manually 2 system node, the 1st one in "neo4j" and the other in an not-existent db
withSystemDb(tx -> {
Util.mergeNode(tx, ApocTrigger, null,
Pair.of(database.name(), dbName),
Pair.of(name.name(), "mytest"));

Util.mergeNode(tx, SystemLabels.ApocTrigger, null,
Pair.of(database.name(), db.databaseName()),
Pair.of(name.name(), "mySecondTest"));
});

// check that the nodes have been created successfully
withSystemDb(tx -> {
Iterator<Node> dbNotExistentNodes = tx.findNodes(ApocTrigger, database.name(), dbName);
assertTrue( dbNotExistentNodes.hasNext() );

Iterator<Node> neo4jNodes = tx.findNodes(ApocTrigger, database.name(), db.databaseName());
assertTrue( neo4jNodes.hasNext() );
});

// check that after a restart only "neo4j" node remains
restartDb();

withSystemDb(tx -> {
Iterator<Node> dbNotExistentNodes = tx.findNodes(ApocTrigger, database.name(), dbName);
assertFalse( dbNotExistentNodes.hasNext() );

Iterator<Node> neo4jNodes = tx.findNodes(ApocTrigger, database.name(), db.databaseName());
assertTrue( neo4jNodes.hasNext() );
});
}

private void withSystemDb(Consumer<Transaction> action) {
try (Transaction tx = sysDb.beginTx()) {
action.accept(tx);
tx.commit();
}
}

@Test
public void testTriggerRunsAfterRestart() {
final String query = "CALL apoc.trigger.add('myTrigger', 'unwind $createdNodes as n set n.trigger = n.trigger + 1', {phase:'before'})";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static apoc.ApocConfig.APOC_CONFIG_INITIALIZER;
import static apoc.ApocConfig.APOC_TRIGGER_ENABLED;
import static apoc.trigger.TriggerHandler.TRIGGER_REFRESH;
import static apoc.trigger.TriggerTestUtil.TIMEOUT;
import static apoc.trigger.TriggerTestUtil.TRIGGER_DEFAULT_REFRESH;
import static apoc.util.TestContainerUtil.createEnterpriseDB;
import static apoc.util.TestContainerUtil.testCall;
import static apoc.util.TestContainerUtil.testCallEmpty;
import static apoc.util.TestContainerUtil.testResult;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -37,6 +39,7 @@

public class TriggerEnterpriseFeaturesTest {
private static final String FOO_DB = "foo";
private static final String INIT_DB = "initdb";

private static final String NO_ADMIN_USER = "nonadmin";
private static final String NO_ADMIN_PWD = "test1234";
Expand All @@ -46,10 +49,15 @@ public class TriggerEnterpriseFeaturesTest {

@BeforeClass
public static void beforeAll() {
final String cypherInitializer = String.format("%s.%s.0",
APOC_CONFIG_INITIALIZER, SYSTEM_DATABASE_NAME);
final String createInitDb = String.format("CREATE DATABASE %s IF NOT EXISTS", INIT_DB);

// We build the project, the artifact will be placed into ./build/libs
neo4jContainer = createEnterpriseDB(List.of(TestContainerUtil.ApocPackage.CORE), true)
.withEnv(APOC_TRIGGER_ENABLED, "true")
.withEnv(TRIGGER_REFRESH, String.valueOf(TRIGGER_DEFAULT_REFRESH));
.withEnv(TRIGGER_REFRESH, String.valueOf(TRIGGER_DEFAULT_REFRESH))
.withEnv(cypherInitializer, createInitDb);
neo4jContainer.start();
session = neo4jContainer.getSession();

Expand Down

0 comments on commit a23e84c

Please sign in to comment.