Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/merge works in ldimport #1171

Draft
wants to merge 9 commits into
base: develop
Choose a base branch
from
51 changes: 41 additions & 10 deletions importers/src/main/groovy/whelk/importer/DatasetImporter.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package whelk.importer

import groovy.util.logging.Log4j2 as Log
import groovy.transform.CompileStatic
import whelk.WorkMerging

import static groovy.transform.TypeCheckingMode.SKIP

import whelk.Document
Expand Down Expand Up @@ -113,15 +115,29 @@ class DatasetImporter {
Document incomingDoc = completeRecord(data, recordType, true)
idsInInput.add(incomingDoc.getShortId())

// This race condition should be benign. If there is a document with
// the same ID created in between the check and the creation, we'll
// get an exception and fail early (unfortunate but acceptable).
switch (createOrUpdateDocument(incomingDoc)) {
case WRITE_RESULT.CREATED:
createdCount++;
break;
case WRITE_RESULT.UPDATED:
updatedCount++;
if (data.get("@type") != null &&
whelk.getJsonld().isSubClassOf( incomingDoc.getThingType(), "Work" )) {

switch (createOrUpdateWork(incomingDoc)) {
case WorkMerging.WRITE_RESULT.CREATED:
createdCount++;
break;
case WorkMerging.WRITE_RESULT.UPDATED:
updatedCount++;
}

} else { // Not a work

// This race condition should be benign. If there is a document with
// the same ID created in between the check and the creation, we'll
// get an exception and fail early (unfortunate but acceptable).
switch (createOrUpdateDocument(incomingDoc)) {
case WRITE_RESULT.CREATED:
createdCount++;
break;
case WRITE_RESULT.UPDATED:
updatedCount++;
}
}

if ( lineCount % 100 == 0 ) {
Expand Down Expand Up @@ -286,8 +302,8 @@ class DatasetImporter {
}

private WRITE_RESULT createOrUpdateDocument(Document incomingDoc) {
Document storedDoc = whelk.getDocument(incomingDoc.getShortId())
WRITE_RESULT result
Document storedDoc = whelk.getDocument(incomingDoc.getShortId())
if (storedDoc != null) {
if (whelk.storeAtomicUpdate(incomingDoc.getShortId(), true, false, refreshDependers, "xl", null, { doc ->
doc.data = incomingDoc.data
Expand All @@ -300,9 +316,24 @@ class DatasetImporter {
whelk.createDocument(incomingDoc, "xl", null, collection, false)
result = WRITE_RESULT.CREATED
}

return result
}

private void createOrUpdateWork(Document incomingWork) {
List bibIDs = []
List graphList = incomingWork.data.get("@graph")
Map mainEntity = graphList[1]
mainEntity.get("@reverse", [:]).get("instanceOf", []).each { bib ->
String instanceID = whelk.getStorage().getSystemIdByIri( (String) bib["@id"] )
if (instanceID != null)
bibIDs.add(instanceID)
}
if (!bibIDs.isEmpty()) {
WorkMerging.mergeWorksOf(bibIDs, [incomingWork], whelk)
}
}

private long removeDeleted(Set<String> idsInInput, List<String> needsRetry) {
// Clear out anything that was previously stored in this dataset, but was not in the in-data now.
// If faced with "can't delete depended on stuff", retry again later, after more other deletes have
Expand Down
10 changes: 10 additions & 0 deletions whelk-core/src/main/groovy/whelk/Document.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import static whelk.util.Jackson.mapper
* A document is represented as a data Map (containing Maps, Lists and Value objects).
*
* This class serves as a wrapper around such a map, with access methods for specific parts of the data.
*
* TODO:
* Many of the accessors of this class assumes the underlying data is an INSTANCE. We may want to break that
* assumption up at some point down the line, or check that it is actually the case in the accessors themselves.
*/
@Log
class Document {
Expand Down Expand Up @@ -51,6 +55,7 @@ class Document {
static final List thingInSchemePath = ["@graph",1,"inScheme","@id"]
static final List recordIdPath = ["@graph", 0, "@id"]
static final List workIdPath = ["@graph", 1, "instanceOf", "@id"]
static final List workPath = ["@graph", 1, "instanceOf"]
static final List thingMetaPath = ["@graph", 1, "meta", "@id"]
static final List recordSameAsPath = ["@graph", 0, "sameAs"]
static final List recordTypedIDsPath = ["@graph", 0, "identifiedBy"]
Expand Down Expand Up @@ -177,6 +182,11 @@ class Document {

void setThingMeta(meta) { set(thingMetaPath, meta) }

Map getWorkEntity() { return get(workPath) }

void setWorkEntity(work) { set(workPath, work) }


/**
* Will have base URI prepended if not already there
*/
Expand Down
170 changes: 170 additions & 0 deletions whelk-core/src/main/groovy/whelk/WorkMerging.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package whelk;

import java.util.*;

public class WorkMerging {

public enum WRITE_RESULT {
ALREADY_UP_TO_DATE,
UPDATED,
CREATED
}

// No proper pointers or multiple return values in Java :(
private static class WriteResultReference {
public WRITE_RESULT result = WRITE_RESULT.ALREADY_UP_TO_DATE;
}

/**
* Merge the works of all listed instances into one. The listed instances
* may or may not have external works already. Orphaned work records will be
* deleted. Extra (previously unsaved) works may optionally be supplied.
*
* This is _not_ one atomic operation, but rather a series of operations.
* This means that it is possible to observe the process halfway though from the
* outside. It also means that should the process be stopped halfway through,
* results may look odd (but will still obey basic data integrity rules).
*/
public static WRITE_RESULT mergeWorksOf(List<String> instanceIDs, List<Document> extraWorks, Whelk whelk) {

WriteResultReference result = new WriteResultReference();

List<Document> instances = collectInstancesOfThisWork(instanceIDs, whelk);

Document baseWork = selectBaseWork(instances, extraWorks, result, whelk);
String baseWorkUri = baseWork.getThingIdentifiers().get(0);
Map correctLinkEntity = new HashMap();
correctLinkEntity.put("@id", baseWorkUri);

// Collect all already existing external works (different from our target) before relinking
List<String> orphanIDs = new ArrayList<>();
for (Document instance : instances) {
Map workEntity = instance.getWorkEntity();
if (workEntity.size() == 1 && !workEntity.equals(correctLinkEntity)) {
String workUri = (String) workEntity.get("@id");
String workId = whelk.getStorage().getSystemIdByIri(workUri);
orphanIDs.add(workId);
}
}

// Merge other works into the baseWork. This must be done first, before any orphans can be deleted,
// or we risk loosing data if the process is interrupted.
/*whelk.storeAtomicUpdate(baseWork.getShortId(), true, false, true, "xl", null, (Document doc) -> {
// TODO MERGE HERE AND DONT FORGET TO SET result.result IF ANYTHING CHANGES!
});*/

// Relink the instances
for (Document instance : instances) {
if (!instance.getWorkEntity().equals(correctLinkEntity)) { // If not already linked to the correct record
whelk.storeAtomicUpdate(instance.getShortId(), true, false, true, "xl", null, (Document doc) -> {
doc.setWorkEntity(correctLinkEntity);
});
}
}

// Cleanup no longer linked work records
for (String orphanID : orphanIDs) {
try {
whelk.getStorage().removeAndTransferMainEntityURIs(orphanID, baseWork.getShortId());
} catch (RuntimeException e) {
// Expected possible cause of exception: A new link was added to this work, _after_ we collected
// and relinked the instances of it. In this (theoretical) case, just leave the old work in place.
}
}

return result.result;
}

/**
* Find the set of instances that should link to the merged work. This of course includes the
* passed instanceIDs, but also any other instances already sharing a work with one of those IDs.
*/
private static List<Document> collectInstancesOfThisWork(List<String> instanceIDs, Whelk whelk) {
List<Document> instances = new ArrayList<>(instanceIDs.size());
for (String instanceID : instanceIDs) {
Document instance = whelk.getDocument(instanceID);
instances.add( instance );

// Are there other instances linking to the same work as 'instance' ? If so add them to the
// collection to (possibly) re-link as well.
Map workEntity = instance.getWorkEntity();
if (workEntity.size() == 1 && workEntity.containsKey("@id")) {
String workUri = (String) workEntity.get("@id");
String workId = whelk.getStorage().getSystemIdByIri(workUri);
for (String otherInstanceId : whelk.getStorage().getDependers(workId)) {
Document otherInstance = whelk.getDocument(otherInstanceId);
instances.add( otherInstance );
}
}
}
return instances;
}

/**
* Select (or create+save) a work record that should be used going forward for
* all of the passed instances.
*/
private static Document selectBaseWork(List<Document> instances, List<Document> extraWorks, WriteResultReference result, Whelk whelk) {
// Find all the works
List<String> linkedWorkURIs = new ArrayList<>();
List<Map> embeddedWorks = new ArrayList<>();
for (Document instance : instances) {
Map workEntity = instance.getWorkEntity();
if (workEntity.size() == 1 && workEntity.containsKey("@id")) {
linkedWorkURIs.add( (String) workEntity.get("@id"));
} else {
embeddedWorks.add(workEntity);
}
}

// Order of priority:
// 1. Any pre existing linked work records
// 2. Any supplied extra works
// 3. Any embedded work from one of the instances

// Pick a linked one if any such exist (1)
String baseWorkUri = null;
if (!linkedWorkURIs.isEmpty()) {
baseWorkUri = linkedWorkURIs.get(0); // TODO: Be a little smarter about _which_ work we pick?
} else if(!extraWorks.isEmpty()) { // Any supplied extra work (2)
Document selectedWork = extraWorks.get(0);

((Map)(((List)selectedWork.data.get("@graph")).get(1))).remove("@reverse"); // ugh

whelk.createDocument(selectedWork, "xl", null, "auth", false);
result.result = WRITE_RESULT.CREATED;
baseWorkUri = selectedWork.getThingIdentifiers().get(0);
} else { // Otherwise break off an embedded one (3)
String slug = IdGenerator.generate();
String recordId = Document.getBASE_URI().toString() + slug;
String mainEntityId = recordId + "#it";

Map chosenEmbedded = embeddedWorks.get(0); // TODO: Be a little smarter about _which_ work we break off?

Map docMap = new HashMap();
List graph = new ArrayList();
Map record = new HashMap();
docMap.put("@graph", graph);

graph.add(record);
record.put("@id", Document.getBASE_URI().toString() + slug);
record.put("@type", "Record");
Map mainEntityLink = new HashMap();
mainEntityLink.put("@id", mainEntityId);
record.put("mainEntity", mainEntityLink);

graph.add(chosenEmbedded);
chosenEmbedded.put("@id", mainEntityId);

Document newWork = new Document(docMap);
newWork.setControlNumber(slug);
newWork.setGenerationDate(new Date());
//newWork.setGenerationProcess("https://id.kb.se/workmerger"); // TODO: KOLLA MED FORMAT!!
whelk.createDocument(newWork, "xl", null, "auth", false);
result.result = WRITE_RESULT.CREATED;
baseWorkUri = mainEntityId;
}

return whelk.getStorage().loadDocumentByMainId(baseWorkUri);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,19 @@ class PostgreSQLComponent {
return doc
}

public removeAndTransferMainEntityURIs(String removeID, String inheritsAliasesID) {
withDbConnection {
Connection connection = getMyConnection()
Document from = lockAndLoad(removeID, connection)
remove(from.getShortId(), "xl", null, false)
storeUpdate(inheritsAliasesID, true, false, true, "xl", null, { to ->
from.getThingIdentifiers().each {
to.addThingIdentifier(it)
}
})
}
}

private Document lockAndLoad(String id, Connection connection) throws DocumentNotFoundException {
PreparedStatement statement = null
ResultSet resultSet = null
Expand Down