diff --git a/importers/src/main/groovy/whelk/importer/DatasetImporter.groovy b/importers/src/main/groovy/whelk/importer/DatasetImporter.groovy index c966ce8e3a..3fddb229c8 100644 --- a/importers/src/main/groovy/whelk/importer/DatasetImporter.groovy +++ b/importers/src/main/groovy/whelk/importer/DatasetImporter.groovy @@ -2,6 +2,8 @@ package whelk.importer import groovy.util.logging.Log4j2 as Log import groovy.transform.CompileStatic +import whelk.WorkMerging + import static groovy.transform.TypeCheckingMode.SKIP import whelk.Document @@ -113,15 +115,29 @@ class DatasetImporter { Document incomingDoc = completeRecord(data, recordType, true) idsInInput.add(incomingDoc.getShortId()) - // This race condition should be benign. If there is a document with - // the same ID created in between the check and the creation, we'll - // get an exception and fail early (unfortunate but acceptable). - switch (createOrUpdateDocument(incomingDoc)) { - case WRITE_RESULT.CREATED: - createdCount++; - break; - case WRITE_RESULT.UPDATED: - updatedCount++; + if (data.get("@type") != null && + whelk.getJsonld().isSubClassOf( incomingDoc.getThingType(), "Work" )) { + + switch (createOrUpdateWork(incomingDoc)) { + case WorkMerging.WRITE_RESULT.CREATED: + createdCount++; + break; + case WorkMerging.WRITE_RESULT.UPDATED: + updatedCount++; + } + + } else { // Not a work + + // This race condition should be benign. If there is a document with + // the same ID created in between the check and the creation, we'll + // get an exception and fail early (unfortunate but acceptable). + switch (createOrUpdateDocument(incomingDoc)) { + case WRITE_RESULT.CREATED: + createdCount++; + break; + case WRITE_RESULT.UPDATED: + updatedCount++; + } } if ( lineCount % 100 == 0 ) { @@ -286,8 +302,8 @@ class DatasetImporter { } private WRITE_RESULT createOrUpdateDocument(Document incomingDoc) { - Document storedDoc = whelk.getDocument(incomingDoc.getShortId()) WRITE_RESULT result + Document storedDoc = whelk.getDocument(incomingDoc.getShortId()) if (storedDoc != null) { if (whelk.storeAtomicUpdate(incomingDoc.getShortId(), true, false, refreshDependers, "xl", null, { doc -> doc.data = incomingDoc.data @@ -300,9 +316,24 @@ class DatasetImporter { whelk.createDocument(incomingDoc, "xl", null, collection, false) result = WRITE_RESULT.CREATED } + return result } + private void createOrUpdateWork(Document incomingWork) { + List bibIDs = [] + List graphList = incomingWork.data.get("@graph") + Map mainEntity = graphList[1] + mainEntity.get("@reverse", [:]).get("instanceOf", []).each { bib -> + String instanceID = whelk.getStorage().getSystemIdByIri( (String) bib["@id"] ) + if (instanceID != null) + bibIDs.add(instanceID) + } + if (!bibIDs.isEmpty()) { + WorkMerging.mergeWorksOf(bibIDs, [incomingWork], whelk) + } + } + private long removeDeleted(Set idsInInput, List needsRetry) { // Clear out anything that was previously stored in this dataset, but was not in the in-data now. // If faced with "can't delete depended on stuff", retry again later, after more other deletes have diff --git a/whelk-core/src/main/groovy/whelk/Document.groovy b/whelk-core/src/main/groovy/whelk/Document.groovy index 0580d6de04..3c1932e986 100644 --- a/whelk-core/src/main/groovy/whelk/Document.groovy +++ b/whelk-core/src/main/groovy/whelk/Document.groovy @@ -20,6 +20,10 @@ import static whelk.util.Jackson.mapper * A document is represented as a data Map (containing Maps, Lists and Value objects). * * This class serves as a wrapper around such a map, with access methods for specific parts of the data. + * + * TODO: + * Many of the accessors of this class assumes the underlying data is an INSTANCE. We may want to break that + * assumption up at some point down the line, or check that it is actually the case in the accessors themselves. */ @Log class Document { @@ -51,6 +55,7 @@ class Document { static final List thingInSchemePath = ["@graph",1,"inScheme","@id"] static final List recordIdPath = ["@graph", 0, "@id"] static final List workIdPath = ["@graph", 1, "instanceOf", "@id"] + static final List workPath = ["@graph", 1, "instanceOf"] static final List thingMetaPath = ["@graph", 1, "meta", "@id"] static final List recordSameAsPath = ["@graph", 0, "sameAs"] static final List recordTypedIDsPath = ["@graph", 0, "identifiedBy"] @@ -177,6 +182,11 @@ class Document { void setThingMeta(meta) { set(thingMetaPath, meta) } + Map getWorkEntity() { return get(workPath) } + + void setWorkEntity(work) { set(workPath, work) } + + /** * Will have base URI prepended if not already there */ diff --git a/whelk-core/src/main/groovy/whelk/WorkMerging.java b/whelk-core/src/main/groovy/whelk/WorkMerging.java new file mode 100644 index 0000000000..6d9fb2e6d0 --- /dev/null +++ b/whelk-core/src/main/groovy/whelk/WorkMerging.java @@ -0,0 +1,170 @@ +package whelk; + +import java.util.*; + +public class WorkMerging { + + public enum WRITE_RESULT { + ALREADY_UP_TO_DATE, + UPDATED, + CREATED + } + + // No proper pointers or multiple return values in Java :( + private static class WriteResultReference { + public WRITE_RESULT result = WRITE_RESULT.ALREADY_UP_TO_DATE; + } + + /** + * Merge the works of all listed instances into one. The listed instances + * may or may not have external works already. Orphaned work records will be + * deleted. Extra (previously unsaved) works may optionally be supplied. + * + * This is _not_ one atomic operation, but rather a series of operations. + * This means that it is possible to observe the process halfway though from the + * outside. It also means that should the process be stopped halfway through, + * results may look odd (but will still obey basic data integrity rules). + */ + public static WRITE_RESULT mergeWorksOf(List instanceIDs, List extraWorks, Whelk whelk) { + + WriteResultReference result = new WriteResultReference(); + + List instances = collectInstancesOfThisWork(instanceIDs, whelk); + + Document baseWork = selectBaseWork(instances, extraWorks, result, whelk); + String baseWorkUri = baseWork.getThingIdentifiers().get(0); + Map correctLinkEntity = new HashMap(); + correctLinkEntity.put("@id", baseWorkUri); + + // Collect all already existing external works (different from our target) before relinking + List orphanIDs = new ArrayList<>(); + for (Document instance : instances) { + Map workEntity = instance.getWorkEntity(); + if (workEntity.size() == 1 && !workEntity.equals(correctLinkEntity)) { + String workUri = (String) workEntity.get("@id"); + String workId = whelk.getStorage().getSystemIdByIri(workUri); + orphanIDs.add(workId); + } + } + + // Merge other works into the baseWork. This must be done first, before any orphans can be deleted, + // or we risk loosing data if the process is interrupted. + /*whelk.storeAtomicUpdate(baseWork.getShortId(), true, false, true, "xl", null, (Document doc) -> { + // TODO MERGE HERE AND DONT FORGET TO SET result.result IF ANYTHING CHANGES! + });*/ + + // Relink the instances + for (Document instance : instances) { + if (!instance.getWorkEntity().equals(correctLinkEntity)) { // If not already linked to the correct record + whelk.storeAtomicUpdate(instance.getShortId(), true, false, true, "xl", null, (Document doc) -> { + doc.setWorkEntity(correctLinkEntity); + }); + } + } + + // Cleanup no longer linked work records + for (String orphanID : orphanIDs) { + try { + whelk.getStorage().removeAndTransferMainEntityURIs(orphanID, baseWork.getShortId()); + } catch (RuntimeException e) { + // Expected possible cause of exception: A new link was added to this work, _after_ we collected + // and relinked the instances of it. In this (theoretical) case, just leave the old work in place. + } + } + + return result.result; + } + + /** + * Find the set of instances that should link to the merged work. This of course includes the + * passed instanceIDs, but also any other instances already sharing a work with one of those IDs. + */ + private static List collectInstancesOfThisWork(List instanceIDs, Whelk whelk) { + List instances = new ArrayList<>(instanceIDs.size()); + for (String instanceID : instanceIDs) { + Document instance = whelk.getDocument(instanceID); + instances.add( instance ); + + // Are there other instances linking to the same work as 'instance' ? If so add them to the + // collection to (possibly) re-link as well. + Map workEntity = instance.getWorkEntity(); + if (workEntity.size() == 1 && workEntity.containsKey("@id")) { + String workUri = (String) workEntity.get("@id"); + String workId = whelk.getStorage().getSystemIdByIri(workUri); + for (String otherInstanceId : whelk.getStorage().getDependers(workId)) { + Document otherInstance = whelk.getDocument(otherInstanceId); + instances.add( otherInstance ); + } + } + } + return instances; + } + + /** + * Select (or create+save) a work record that should be used going forward for + * all of the passed instances. + */ + private static Document selectBaseWork(List instances, List extraWorks, WriteResultReference result, Whelk whelk) { + // Find all the works + List linkedWorkURIs = new ArrayList<>(); + List embeddedWorks = new ArrayList<>(); + for (Document instance : instances) { + Map workEntity = instance.getWorkEntity(); + if (workEntity.size() == 1 && workEntity.containsKey("@id")) { + linkedWorkURIs.add( (String) workEntity.get("@id")); + } else { + embeddedWorks.add(workEntity); + } + } + + // Order of priority: + // 1. Any pre existing linked work records + // 2. Any supplied extra works + // 3. Any embedded work from one of the instances + + // Pick a linked one if any such exist (1) + String baseWorkUri = null; + if (!linkedWorkURIs.isEmpty()) { + baseWorkUri = linkedWorkURIs.get(0); // TODO: Be a little smarter about _which_ work we pick? + } else if(!extraWorks.isEmpty()) { // Any supplied extra work (2) + Document selectedWork = extraWorks.get(0); + + ((Map)(((List)selectedWork.data.get("@graph")).get(1))).remove("@reverse"); // ugh + + whelk.createDocument(selectedWork, "xl", null, "auth", false); + result.result = WRITE_RESULT.CREATED; + baseWorkUri = selectedWork.getThingIdentifiers().get(0); + } else { // Otherwise break off an embedded one (3) + String slug = IdGenerator.generate(); + String recordId = Document.getBASE_URI().toString() + slug; + String mainEntityId = recordId + "#it"; + + Map chosenEmbedded = embeddedWorks.get(0); // TODO: Be a little smarter about _which_ work we break off? + + Map docMap = new HashMap(); + List graph = new ArrayList(); + Map record = new HashMap(); + docMap.put("@graph", graph); + + graph.add(record); + record.put("@id", Document.getBASE_URI().toString() + slug); + record.put("@type", "Record"); + Map mainEntityLink = new HashMap(); + mainEntityLink.put("@id", mainEntityId); + record.put("mainEntity", mainEntityLink); + + graph.add(chosenEmbedded); + chosenEmbedded.put("@id", mainEntityId); + + Document newWork = new Document(docMap); + newWork.setControlNumber(slug); + newWork.setGenerationDate(new Date()); + //newWork.setGenerationProcess("https://id.kb.se/workmerger"); // TODO: KOLLA MED FORMAT!! + whelk.createDocument(newWork, "xl", null, "auth", false); + result.result = WRITE_RESULT.CREATED; + baseWorkUri = mainEntityId; + } + + return whelk.getStorage().loadDocumentByMainId(baseWorkUri); + } +} diff --git a/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy b/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy index 2b00823fc6..9c176d75cc 100644 --- a/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy +++ b/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy @@ -991,6 +991,19 @@ class PostgreSQLComponent { return doc } + public removeAndTransferMainEntityURIs(String removeID, String inheritsAliasesID) { + withDbConnection { + Connection connection = getMyConnection() + Document from = lockAndLoad(removeID, connection) + remove(from.getShortId(), "xl", null, false) + storeUpdate(inheritsAliasesID, true, false, true, "xl", null, { to -> + from.getThingIdentifiers().each { + to.addThingIdentifier(it) + } + }) + } + } + private Document lockAndLoad(String id, Connection connection) throws DocumentNotFoundException { PreparedStatement statement = null ResultSet resultSet = null