Skip to content

Commit

Permalink
Optionally copy document history when using WhelkCopier (#1456)
Browse files Browse the repository at this point in the history
* Optionally copy document history when using WhelkCopier

* Optionally strict historical copies to specified types
  • Loading branch information
andersju committed Jul 22, 2024
1 parent 8eece1a commit 671db0f
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 25 deletions.
39 changes: 25 additions & 14 deletions importers/src/main/groovy/whelk/importer/ImporterMain.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -122,33 +122,44 @@ class ImporterMain {
}

/**
* The additional_types argument should be one of:
* - comma-separated list of types to include. Like so: Person,GenreForm
* - --all-types, to copy *all* types
* - "", to be able to use --exclude-items without specifying additional types
* The --additional-types argument can look like one of the following:
* --additional-types=all # Copy all types
* --additional-types=none # Don't copy additional types (default)
* --additional-types=Person,GenreForm # Copy specific types
*
* You can also optionally copy historical document versions (from lddb__versions)
* for certain types, all types, or not at all (default). "all" means
* "copy versions of all records selected by RECORD_ID_FILE":
* --copy-versions=all # Copy history for every document added
* --copy-versions=Instance,Person # Copy history only for specific types
* --copy-versions=none # Don't copy history (default)
*
* E.g., copy everything except items (holdings):
* SOURCE_PROPERTIES RECORD_ID_FILE --all-types --exclude-items
* SOURCE_PROPERTIES RECORD_ID_FILE --additional-types=all --exclude-items
*
* Copy only what's in RECORD_ID_FILE, but exclude items:
* SOURCE_PROPERTIES RECORD_ID_FILE "" --exclude-items
* SOURCE_PROPERTIES RECORD_ID_FILE --additional-types=none --exclude-items
*
* Copy what's in RECORD_ID_FILE and types MovingImageInstance and Map, include items:
* SOURCE_PROPERTIES RECORD_ID_FILE --additional-types=MovingImageInstance,Map
*
* Copy what's in RECORD_ID_FILE and types MovingImageInstance and Map, don't exclude items:
* SOURCE_PROPERTIES RECORD_ID_FILE MovingImageInstance,Map
* Copy what's in RECORD_ID_FILE and types MovingImageInstance and Map, include items,
* and copy historical versions of Instance and Map:
* SOURCE_PROPERTIES RECORD_ID_FILE --additional-types=MovingImageInstance,Map --include-items --copy-versions=Instance,Map
*/
@Command(args='SOURCE_PROPERTIES RECORD_ID_FILE [<ADDITIONAL_TYPES | --all-types | ""> [--exclude-items]]')
void copywhelk(String sourcePropsFile, String recordsFile, additionalTypes=null, String excludeItems=null) {
@Command(args='SOURCE_PROPERTIES RECORD_ID_FILE [--additional-types=<types>] [--exclude-items | --dont-exclude-items] [--copy-versions=<types>]')
void copywhelk(String sourcePropsFile, String recordsFile, String additionalTypes=null, String excludeItems=null, String copyVersions=null) {
def sourceProps = new Properties()
new File(sourcePropsFile).withInputStream { it
sourceProps.load(it)
}
def source = Whelk.createLoadedCoreWhelk(sourceProps)
def dest = Whelk.createLoadedSearchWhelk(props)
def recordIds = new File(recordsFile).collect {
Whelk source = Whelk.createLoadedCoreWhelk(sourceProps)
Whelk dest = Whelk.createLoadedSearchWhelk(props)
List<String> recordIds = new File(recordsFile).collect {
it.split(/\t/)[0]
}
boolean shouldExcludeItems = excludeItems && excludeItems == '--exclude-items'
def copier = new WhelkCopier(source, dest, recordIds, additionalTypes, shouldExcludeItems)
WhelkCopier copier = new WhelkCopier(source, dest, recordIds, additionalTypes, shouldExcludeItems, copyVersions)
copier.run()
}

Expand Down
91 changes: 80 additions & 11 deletions importers/src/main/groovy/whelk/importer/WhelkCopier.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package whelk.importer

import whelk.Document
import whelk.Whelk
import whelk.history.DocumentVersion
import whelk.util.BlockingThreadPool
import whelk.util.LegacyIntegrationTools

Expand All @@ -16,52 +17,75 @@ class WhelkCopier {
Whelk dest
List recordIds
String additionalTypes
String copyVersions
boolean shouldExcludeItems
BlockingThreadPool.SimplePool threadPool = BlockingThreadPool.simplePool(Runtime.getRuntime().availableProcessors())
List<Document> saveQueue = []

private int copied = 0
private int copiedVersions = 0
private String additionalTypesPrefix = "--additional-types="
private String copyVersionsPrefix = "--copy-versions="
private List<String> versionTypes
private TreeSet<String> idsToCopyVersionsOf = new TreeSet<>()

WhelkCopier(source, dest, recordIds, additionalTypes, shouldExcludeItems) {
WhelkCopier(Whelk source, Whelk dest, List<String> recordIds, String additionalTypes, boolean shouldExcludeItems, String copyVersions) {
this.source = source
this.dest = dest
this.recordIds = recordIds
this.additionalTypes = additionalTypes
this.shouldExcludeItems = shouldExcludeItems
this.copyVersions = copyVersions

dest.storage.doVerifyDocumentIdRetention = false
}

void run() {
TreeSet<String> alreadyImportedIDs = new TreeSet<>()

if (copyVersions) {
versionTypes = copyVersions.substring(copyVersionsPrefix.length()).split(",")
if ("none" in versionTypes) {
versionTypes.clear()
}
}
if (versionTypes) {
System.err.println("Old versions of the following types will be copied: $versionTypes")
}

if (additionalTypes) {
String whereClause
if (additionalTypes == "--all-types") {
List<String> types = additionalTypes.substring(additionalTypesPrefix.length()).split(",")

if ("all" in types) {
whereClause = "deleted = false"
if (shouldExcludeItems) {
whereClause += " and data#>>'{@graph,1,@type}' != 'Item'"
}
} else {
String[] types = additionalTypes.split(",")
} else if (types.size() > 0 && !("none" in types)) {
whereClause = "deleted = false and data#>>'{@graph,1,@type}' in (\n" +
"'" + types.join("','") + "'" + ")"
}

if (whereClause) {
System.err.println("The following WHERE clause will be used for copying additional types: ${whereClause}")
}

source.storage.withDbConnection {
for (doc in selectBySqlWhere(whereClause)) {
if (doc.deleted) continue
doc.baseUri = source.baseUri
if (!alreadyImportedIDs.contains(doc.shortId)) {
alreadyImportedIDs.add(doc.shortId)
maybeCopyVersions(doc)
queueSave(doc)
}
}
}
}

for (id in recordIds) {
def doc
for (String id in recordIds) {
Document doc
if (id.contains("/")) {
doc = source.storage.getDocumentByIri(id)
}
Expand All @@ -80,11 +104,13 @@ class WhelkCopier {
relDoc.baseUri = source.baseUri
if (!alreadyImportedIDs.contains(relDoc.shortId)) {
alreadyImportedIDs.add(relDoc.shortId)
maybeCopyVersions(relDoc)
queueSave(relDoc)
}
}
if (!alreadyImportedIDs.contains(doc.shortId)) {
alreadyImportedIDs.add(doc.shortId)
maybeCopyVersions(doc)
queueSave(doc)
}
}
Expand All @@ -101,16 +127,47 @@ class WhelkCopier {
revDoc.baseUri = source.baseUri
if (!alreadyImportedIDs.contains(revDoc.shortId)) {
alreadyImportedIDs.add(revDoc.shortId)
maybeCopyVersions(revDoc)
queueSave(revDoc)
}
}
}
}

if (copyVersions) {
source.storage.withDbConnection {
for (String shortId in idsToCopyVersionsOf) {
source.storage.loadDocumentHistory(shortId).eachWithIndex { DocumentVersion docVersion, i ->
// Skip the first (latest) version, it'll be added by quickCreateDocument
if (i == 0) {
return
}
docVersion.doc.baseUri = source.baseUri
// Add some out-of-record data so we know what to do/use in save()
docVersion.doc.data["_isVersion"] = true
docVersion.doc.data["_changedBy"] = docVersion.changedBy
docVersion.doc.data["_changedIn"] = docVersion.changedIn
queueSave(docVersion.doc)
}
}
}
}

flushSaveQueue()
threadPool.awaitAllAndShutdown()

dest.storage.reDenormalize()
System.err.println "Copied $copied documents (from ${recordIds.size()} selected)."
if (copyVersions) {
System.err.println("Copied ${copied} documents (from ${recordIds.size()} selected), including ${copiedVersions} historical versions.")
} else {
System.err.println("Copied ${copied} documents (from ${recordIds.size()} selected).")
}
}

void maybeCopyVersions(Document doc) {
if (versionTypes && ("all" in versionTypes || doc.getThingType() in versionTypes)) {
idsToCopyVersionsOf.add(doc.shortId)
}
}

Iterable<Document> selectBySqlWhere(whereClause) {
Expand All @@ -127,9 +184,13 @@ class WhelkCopier {
source.storage.iterateDocuments(rs)
}

void queueSave(doc) {
void queueSave(Document doc) {
saveQueue.add(doc)
copied++
if (copyVersions && doc.data["_isVersion"]) {
copiedVersions++
}

if (copied % 200 == 0)
System.err.println "Records queued for copying: $copied"
if (saveQueue.size() >= SAVE_BATCH_SIZE) {
Expand All @@ -146,7 +207,7 @@ class WhelkCopier {
})
}

void save(doc) {
void save(Document doc) {
def libUriPlaceholder = "___TEMP_HARDCODED_LIB_BASEURI"
def newDataRepr = doc.dataAsString.replaceAll( // Move all lib uris, to a temporary placeholder.
'"\\Q' + source.baseUri.resolve("library/").toString() + '\\E',
Expand All @@ -158,15 +219,23 @@ class WhelkCopier {
'"\\Q' + libUriPlaceholder + '\\E',
'"' + source.baseUri.resolve("library/").toString())

def newDoc = new Document(mapper.readValue(newDataRepr, Map))
Document newDoc = new Document(mapper.readValue(newDataRepr, Map))

def newId = dest.baseUri.resolve(doc.shortId).toString()
newDoc.id = newId

def collection = LegacyIntegrationTools.determineLegacyCollection(newDoc, dest.jsonld)
if (collection != "definitions") {
try {
dest.quickCreateDocument(newDoc, "xl", "WhelkCopier", collection)
if (doc.data["_isVersion"]) {
Date created = Date.from(doc.getCreatedTimestamp())
Date modified = Date.from(doc.getModifiedTimestamp())
String changedIn = doc.data["_changedIn"]
String changedBy = doc.data["_changedBy"]
dest.quickCreateDocumentVersion(newDoc, created, modified, changedIn, changedBy, collection)
} else {
dest.quickCreateDocument(newDoc, "xl", "WhelkCopier", collection)
}
} catch (Exception e) {
System.err.println("Could not save $doc.shortId due to: $e")
}
Expand Down
4 changes: 4 additions & 0 deletions whelk-core/src/main/groovy/whelk/Document.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ class Document {
get(createdPath)
}

Instant getCreatedTimestamp() {
ZonedDateTime.parse(getCreated(), DateTimeFormatter.ISO_OFFSET_DATE_TIME).toInstant()
}

void setModified(Date modified) {
ZonedDateTime zdt = ZonedDateTime.ofInstant(modified.toInstant(), ZoneId.systemDefault())
String formatedModified = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(zdt)
Expand Down
8 changes: 8 additions & 0 deletions whelk-core/src/main/groovy/whelk/Whelk.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,14 @@ class Whelk {
return storage.quickCreateDocument(document, changedIn, changedBy, collection)
}

/**
* Like quickCreateDocument but for adding historical document versions. NOT to be used in production
* environments; for development purposes only.
*/
boolean quickCreateDocumentVersion(Document document, Date createdTime, Date modTime, String changedIn, String changedBy, String collection) {
return storage.quickCreateDocumentVersion(document, createdTime, modTime, changedIn, changedBy, collection)
}

void remove(String id, String changedIn, String changedBy, boolean force = false) {
log.debug "Deleting ${id} from Whelk"
Document doc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,29 @@ class PostgreSQLComponent {
}
}

/**
* Like quickCreateDocument, but only for saving to the versions table.
* It should NOT be used in a production environment. It's only meant to (optionally)
* be used with WhelkCopier when copying records from one XL environment to another,
* for dev purposes.
*/
boolean quickCreateDocumentVersion(Document doc, Date createdTime, Date modTime, String changedIn, String changedBy, String collection) {
return withDbConnection {
Connection connection = getMyConnection()
try {
connection.setAutoCommit(false)
saveVersion(doc, connection, createdTime, modTime, changedIn, changedBy, collection, false)
connection.commit()
return true
} catch (Exception e) {
log.error("Failed to save document version: ${e.message}. Rolling back.")
connection.rollback()
throw e
}
}
}


void acquireRowLock(String id, Connection connection) {
PreparedStatement lockStatement = connection.prepareStatement(GET_DOCUMENT_FOR_UPDATE)
lockStatement.setString(1, id)
Expand Down

0 comments on commit 671db0f

Please sign in to comment.