Skip to content

Commit

Permalink
Fix node target reference lookup and node match mode (#1961)
Browse files Browse the repository at this point in the history
* Fix node target reference lookup and node match mode

Make node match mode mandatory.

Look up all active node targets when resolving start/end node
target references, not just the active node targets sharing the
relationship target's source.

* Update to latest release of import-spec

It contains a couple of required fixes around property helpers

* fix: look up active node targets only

* Simplify test spec

Use simpler IDs, remove unused property types

* fix: add missing start/node target as dependencies
  • Loading branch information
fbiville authored Nov 25, 2024
1 parent 94a7e9f commit 40e8298
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 9 deletions.
2 changes: 1 addition & 1 deletion v2/googlecloud-to-neo4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<dependency>
<groupId>org.neo4j.importer</groupId>
<artifactId>import-spec</artifactId>
<version>1.0.0-rc02</version>
<version>1.0.0-rc03</version>
</dependency>
<dependency>
<groupId>com.google.cloud.teleport.v2</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.neo4j.model.validation;

import java.util.LinkedHashSet;
import java.util.Set;
import org.neo4j.importer.v1.targets.RelationshipTarget;
import org.neo4j.importer.v1.validation.SpecificationValidationResult.Builder;
import org.neo4j.importer.v1.validation.SpecificationValidator;

public class NodeMatchModeValidator implements SpecificationValidator {

private static final String ERROR_CODE = "RNMM-001";

private final Set<String> paths;

public NodeMatchModeValidator() {
paths = new LinkedHashSet<>();
}

@Override
public void visitRelationshipTarget(int index, RelationshipTarget target) {
if (target.getNodeMatchMode() == null) {
paths.add(String.format("$.targets.relationships[%d].node_match_mode", index));
}
}

@Override
public boolean report(Builder builder) {
paths.forEach(
path ->
builder.addError(
path,
ERROR_CODE,
String.format("%s is missing: please specify a node match mode", path)));
return paths.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -274,6 +275,10 @@ public void run() {
Entry::getKey, mapping(Entry::getValue, Collectors.<PCollection<?>>toList())));
var sourceRows = new ArrayList<PCollection<?>>(importSpecification.getSources().size());
var targetRows = new HashMap<TargetType, List<PCollection<?>>>(targetCount());
var allActiveNodeTargets =
importSpecification.getTargets().getNodes().stream()
.filter(Target::isActive)
.collect(toList());

////////////////////////////
// Process sources
Expand Down Expand Up @@ -373,8 +378,10 @@ public void run() {
.nullableSourceRows(nullableSourceBeamRows)
.sourceBeamSchema(sourceBeamSchema)
.target(target)
.startNodeTarget(findNodeTargetByName(nodeTargets, target.getStartNodeReference()))
.endNodeTarget(findNodeTargetByName(nodeTargets, target.getEndNodeReference()))
.startNodeTarget(
findNodeTargetByName(allActiveNodeTargets, target.getStartNodeReference()))
.endNodeTarget(
findNodeTargetByName(allActiveNodeTargets, target.getEndNodeReference()))
.build();
PCollection<Row> preInsertBeamRows;
String relationshipStepDescription =
Expand All @@ -395,17 +402,19 @@ public void run() {

List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_RELATIONSHIPS, List.of()));
Set<String> dependencyNames = new LinkedHashSet<>(target.getDependencies());
dependencyNames.add(target.getStartNodeReference());
dependencyNames.add(target.getEndNodeReference());
dependencies.add(
processingQueue.waitOnCollections(
target.getDependencies(), relationshipStepDescription));
processingQueue.waitOnCollections(dependencyNames, relationshipStepDescription));

PCollection<Row> blockingReturn =
preInsertBeamRows
.apply(
"** Unblocking "
+ relationshipStepDescription
+ "(after "
+ String.join(", ", target.getDependencies())
+ String.join(", ", dependencyNames)
+ " and pre-relationships actions)",
Wait.on(dependencies))
.setCoder(preInsertBeamRows.getCoder())
Expand Down Expand Up @@ -567,7 +576,8 @@ private static NodeTarget findNodeTargetByName(List<NodeTarget> nodes, String re
return nodes.stream()
.filter(target -> reference.equals(target.getName()))
.findFirst()
.orElse(null);
.orElseThrow(
() -> new IllegalArgumentException("Could not find active node target: " + reference));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.cloud.teleport.v2.neo4j.model.enums.ArtifactType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -81,7 +82,8 @@ public void addToQueue(
executionContexts.put(artifactType.name() + ":" + name, executionContext);
}

public PCollection<Row> waitOnCollections(List<String> dependencies, String queuingDescription) {
public PCollection<Row> waitOnCollections(
Collection<String> dependencies, String queuingDescription) {
List<PCollection<Row>> waitOnQueues = populateQueueForTargets(dependencies);
if (waitOnQueues.isEmpty()) {
waitOnQueues.add(defaultCollection);
Expand All @@ -101,7 +103,7 @@ public PCollection<Row> waitOnCollections(List<String> dependencies, String queu
Flatten.pCollections());
}

private List<PCollection<Row>> populateQueueForTargets(List<String> dependencies) {
private List<PCollection<Row>> populateQueueForTargets(Collection<String> dependencies) {
List<PCollection<Row>> waitOnQueues = new ArrayList<>();
for (String dependency : dependencies) {
for (ArtifactType type : ArtifactType.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ com.google.cloud.teleport.v2.neo4j.model.validation.InlineSourceDataValidator
com.google.cloud.teleport.v2.neo4j.model.validation.NodeKeyValidator
com.google.cloud.teleport.v2.neo4j.model.validation.TextColumnMappingValidator
com.google.cloud.teleport.v2.neo4j.model.validation.WriteModeValidator
com.google.cloud.teleport.v2.neo4j.model.validation.NodeMatchModeValidator
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.neo4j.templates;

import static com.google.cloud.teleport.v2.neo4j.templates.Connections.jsonBasicPayload;
import static com.google.cloud.teleport.v2.neo4j.templates.Resources.contentOf;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;

import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import net.jcip.annotations.NotThreadSafe;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.TemplateTestBase;
import org.apache.beam.it.neo4j.Neo4jResourceManager;
import org.apache.beam.it.neo4j.conditions.Neo4jQueryCheck;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@Category(TemplateIntegrationTest.class)
@TemplateIntegrationTest(GoogleCloudToNeo4j.class)
@RunWith(JUnit4.class)
@NotThreadSafe
public class MovieImportIT extends TemplateTestBase {
private Neo4jResourceManager neo4jClient;

@Before
public void setup() {
neo4jClient =
Neo4jResourceManager.builder(testName)
.setAdminPassword("letmein!")
.setHost(TestProperties.hostIp())
.build();
}

@After
public void tearDown() {
ResourceManagerUtils.cleanResources(neo4jClient);
}

@Test
public void importsMovieGraphFromInlineData() throws IOException {
gcsClient.createArtifact(
"spec.json", contentOf("/testing-specs/import-spec/movie-import/spec.json"));
gcsClient.createArtifact("neo4j.json", jsonBasicPayload(neo4jClient));

LaunchConfig.Builder options =
LaunchConfig.builder(testName, specPath)
.addParameter("jobSpecUri", getGcsPath("spec.json"))
.addParameter("neo4jConnectionUri", getGcsPath("neo4j.json"));
LaunchInfo info = launchTemplate(options);

assertThatPipeline(info).isRunning();
assertThatResult(
pipelineOperator()
.waitForConditionAndCancel(
createConfig(info),
Neo4jQueryCheck.builder(neo4jClient)
.setQuery("MATCH (n:Person) RETURN count(n) AS count")
.setExpectedResult(List.of(Map.of("count", 4L)))
.build(),
Neo4jQueryCheck.builder(neo4jClient)
.setQuery("MATCH (n:Movie) RETURN count(n) AS count")
.setExpectedResult(List.of(Map.of("count", 2L)))
.build(),
Neo4jQueryCheck.builder(neo4jClient)
.setQuery("MATCH (:Person)-[r:ACTED_IN]->(:Movie) RETURN count(r) AS count")
.setExpectedResult(List.of(Map.of("count", 2L)))
.build(),
Neo4jQueryCheck.builder(neo4jClient)
.setQuery("MATCH (:Person)-[r:DIRECTED]->(:Movie) RETURN count(r) AS count")
.setExpectedResult(List.of(Map.of("count", 2L)))
.build()))
.meetsConditions();
}
}
Loading

0 comments on commit 40e8298

Please sign in to comment.