From 73c35eb69b10ae07af24718f9198cb614cade2b1 Mon Sep 17 00:00:00 2001 From: Tobias Kampmann Date: Tue, 21 Nov 2023 16:21:20 +0100 Subject: [PATCH] feat(backend): facilitate large compressed uploads * introduce two auxiliary tables to efficiently validate and merge metadata and sequence data * remove singleton SequenceEntriesTable and replace it with provided and cached `Table`s to facilitate compression of sequence data * de-compress sequence strings with custom dictionary when de-serializing * support for zstd, gzip, xz, lzma, zip, bzip2 --- backend/build.gradle | 8 +- backend/docs/runtime_view.md | 3 +- .../backend/api/SubmissionTypes.kt | 11 +- .../org/pathoplexus/backend/config/Config.kt | 9 +- .../backend/config/ReferenceGenome.kt | 4 + .../backend/controller/ExceptionHandler.kt | 7 +- .../controller/SubmissionController.kt | 13 +- .../SubmissionControllerDescriptions.kt | 4 +- .../backend/model/ReleasedDataModel.kt | 4 +- .../pathoplexus/backend/model/SubmitModel.kt | 269 ++++-- .../backend/service/CompressionService.kt | 111 +++ .../backend/service/DatabaseService.kt | 851 +++++++++--------- .../service/JacksonSerializableJsonb.kt | 14 + .../backend/service/MetadataUploadAuxTable.kt | 17 + .../service/QueryPreconditionValidator.kt | 134 +-- .../backend/service/SequenceEntriesTable.kt | 115 --- .../service/SequenceEntriesTableProvider.kt | 125 +++ .../backend/service/SequenceUploadAuxTable.kt | 14 + .../backend/service/UploadDatabaseService.kt | 155 ++++ .../backend/utils/AccessionComparators.kt | 21 + .../pathoplexus/backend/utils/FastaEntry.kt | 19 - .../pathoplexus/backend/utils/FastaReader.kt | 9 +- .../backend/utils/MetadataEntry.kt | 46 + .../backend/utils/ParseFastaHeader.kt | 25 + .../src/main/resources/application.properties | 4 +- .../main/resources/db/migration/V1__init.sql | 19 + backend/src/main/resources/logback.xml | 2 +- .../controller/DeleteSequencesEndpointTest.kt | 2 +- .../controller/EndpointTestExtension.kt | 7 +- .../controller/ExceptionHandlerTest.kt | 2 +- .../controller/GetReleasedDataEndpointTest.kt | 4 +- .../controller/PreparedProcessedData.kt | 2 +- .../backend/controller/ReviseEndpointTest.kt | 9 +- .../SingleSegmentedSubmitEndpointTest.kt | 2 +- .../controller/SubmissionControllerClient.kt | 2 +- .../controller/SubmissionConvenienceClient.kt | 2 +- .../backend/controller/SubmitEndpointTest.kt | 57 +- .../backend/controller/SubmitFiles.kt | 112 ++- .../SubmitProcessedDataEndpointTest.kt | 2 +- .../backend/service/CompressionServiceTest.kt | 33 + preprocessing/specification.md | 4 +- website/tests/e2e.fixture.ts | 2 + website/tests/pages/submit/index.spec.ts | 11 + website/tests/pages/submit/submit.page.ts | 16 +- website/tests/testData/metadata.tsv.zst | Bin 0 -> 253 bytes website/tests/testData/sequences.fasta.zst | Bin 0 -> 65 bytes 46 files changed, 1563 insertions(+), 719 deletions(-) create mode 100644 backend/src/main/kotlin/org/pathoplexus/backend/service/CompressionService.kt create mode 100644 backend/src/main/kotlin/org/pathoplexus/backend/service/JacksonSerializableJsonb.kt create mode 100644 backend/src/main/kotlin/org/pathoplexus/backend/service/MetadataUploadAuxTable.kt delete mode 100644 backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceEntriesTable.kt create mode 100644 backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceEntriesTableProvider.kt create mode 100644 backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceUploadAuxTable.kt create mode 100644 backend/src/main/kotlin/org/pathoplexus/backend/service/UploadDatabaseService.kt create mode 100644 backend/src/main/kotlin/org/pathoplexus/backend/utils/AccessionComparators.kt delete mode 100644 backend/src/main/kotlin/org/pathoplexus/backend/utils/FastaEntry.kt create mode 100644 backend/src/main/kotlin/org/pathoplexus/backend/utils/MetadataEntry.kt create mode 100644 backend/src/main/kotlin/org/pathoplexus/backend/utils/ParseFastaHeader.kt create mode 100644 backend/src/test/kotlin/org/pathoplexus/backend/service/CompressionServiceTest.kt create mode 100644 website/tests/testData/metadata.tsv.zst create mode 100644 website/tests/testData/sequences.fasta.zst diff --git a/backend/build.gradle b/backend/build.gradle index 2a4e96970..c2077d309 100644 --- a/backend/build.gradle +++ b/backend/build.gradle @@ -42,10 +42,14 @@ dependencies { implementation "org.jetbrains.exposed:exposed-kotlin-datetime:$exposedVersion" implementation "org.jetbrains.kotlinx:kotlinx-datetime:0.5.0" implementation "org.hibernate.validator:hibernate-validator:8.0.1.Final" - + implementation "org.springframework.boot:spring-boot-starter-oauth2-resource-server" implementation "org.springframework.boot:spring-boot-starter-security" - + + implementation 'org.apache.commons:commons-compress:1.25.0' + implementation 'com.github.luben:zstd-jni:1.5.2-4' + implementation 'org.tukaani:xz:1.8' + testImplementation("org.springframework.boot:spring-boot-starter-test") { exclude group: "org.mockito" } diff --git a/backend/docs/runtime_view.md b/backend/docs/runtime_view.md index b4b268004..d7cfdf1dc 100644 --- a/backend/docs/runtime_view.md +++ b/backend/docs/runtime_view.md @@ -11,7 +11,8 @@ ## Initial submission -To submit new sequences, the user calls the `/submit` endpoint and sends unpreprocessed data. +To submit new sequences, the user calls the `/submit` endpoint and sends unpreprocessed data. +Data may be compressed using zstd, gzip, bzip2, xz, lzma or zip. For each sequence, Pathoplexus creates a new row in the "sequenceEntries" table. It generates a new accession. The version number of the sequence entries is 1. diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/api/SubmissionTypes.kt b/backend/src/main/kotlin/org/pathoplexus/backend/api/SubmissionTypes.kt index e094b6b10..d387e565c 100644 --- a/backend/src/main/kotlin/org/pathoplexus/backend/api/SubmissionTypes.kt +++ b/backend/src/main/kotlin/org/pathoplexus/backend/api/SubmissionTypes.kt @@ -8,8 +8,8 @@ import com.fasterxml.jackson.databind.JsonDeserializer import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.annotation.JsonDeserialize import io.swagger.v3.oas.annotations.media.Schema -import org.pathoplexus.backend.service.Accession -import org.pathoplexus.backend.service.Version +import org.pathoplexus.backend.utils.Accession +import org.pathoplexus.backend.utils.Version interface AccessionVersionInterface { val accession: Accession @@ -154,11 +154,6 @@ data class RevisedData( val originalData: OriginalData, ) -data class SubmittedData( - val submissionId: String, - val originalData: OriginalData, -) - data class UnprocessedData( @Schema(example = "123") override val accession: Accession, @Schema(example = "1") override val version: Version, @@ -175,7 +170,7 @@ data class OriginalData( example = "{\"segment1\": \"ACTG\", \"segment2\": \"GTCA\"}", description = "The key is the segment name, the value is the nucleotide sequence", ) - val unalignedNucleotideSequences: Map, + val unalignedNucleotideSequences: Map, ) enum class Status { diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/config/Config.kt b/backend/src/main/kotlin/org/pathoplexus/backend/config/Config.kt index 65fa48c1d..aa38199e6 100644 --- a/backend/src/main/kotlin/org/pathoplexus/backend/config/Config.kt +++ b/backend/src/main/kotlin/org/pathoplexus/backend/config/Config.kt @@ -1,8 +1,15 @@ package org.pathoplexus.backend.config +import org.pathoplexus.backend.api.Organism + data class BackendConfig( val instances: Map, -) +) { + fun getInstanceConfig(organism: Organism) = + instances[organism.name] ?: throw IllegalArgumentException( + "Organism: ${organism.name} not found in backend config. Available organisms: ${instances.keys}", + ) +} data class InstanceConfig( val schema: Schema, diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/config/ReferenceGenome.kt b/backend/src/main/kotlin/org/pathoplexus/backend/config/ReferenceGenome.kt index a37a282de..a71c4ed64 100644 --- a/backend/src/main/kotlin/org/pathoplexus/backend/config/ReferenceGenome.kt +++ b/backend/src/main/kotlin/org/pathoplexus/backend/config/ReferenceGenome.kt @@ -11,6 +11,10 @@ data class ReferenceGenome( throw IllegalArgumentException("If there is only one nucleotide sequence, it must be named 'main'") } } + + fun getNucleotideSegmentReference(segmentName: String): NucleotideSequence? = nucleotideSequences.find { + it.name == segmentName + }?.sequence } data class ReferenceSequence( diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/controller/ExceptionHandler.kt b/backend/src/main/kotlin/org/pathoplexus/backend/controller/ExceptionHandler.kt index d19d302e8..d3d777cdd 100644 --- a/backend/src/main/kotlin/org/pathoplexus/backend/controller/ExceptionHandler.kt +++ b/backend/src/main/kotlin/org/pathoplexus/backend/controller/ExceptionHandler.kt @@ -62,7 +62,11 @@ class ExceptionHandler : ResponseEntityExceptionHandler() { return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build() } - @ExceptionHandler(UnprocessableEntityException::class, ProcessingValidationException::class) + @ExceptionHandler( + UnprocessableEntityException::class, + ProcessingValidationException::class, + DuplicateKeyException::class, + ) @ResponseStatus(HttpStatus.UNPROCESSABLE_ENTITY) fun handleUnprocessableEntityException(e: Exception): ResponseEntity { log.warn(e) { "Caught unprocessable entity exception: ${e.message}" } @@ -144,5 +148,6 @@ class ForbiddenException(message: String) : RuntimeException(message) class UnprocessableEntityException(message: String) : RuntimeException(message) class NotFoundException(message: String) : RuntimeException(message) class ProcessingValidationException(message: String) : RuntimeException(message) +class DuplicateKeyException(message: String) : RuntimeException(message) class DummyUnauthorizedExceptionToMakeItAppearInSwaggerUi : RuntimeException() diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/controller/SubmissionController.kt b/backend/src/main/kotlin/org/pathoplexus/backend/controller/SubmissionController.kt index 45fefe7af..984f6e912 100644 --- a/backend/src/main/kotlin/org/pathoplexus/backend/controller/SubmissionController.kt +++ b/backend/src/main/kotlin/org/pathoplexus/backend/controller/SubmissionController.kt @@ -19,8 +19,8 @@ import org.pathoplexus.backend.api.SubmittedProcessedData import org.pathoplexus.backend.api.UnprocessedData import org.pathoplexus.backend.model.ReleasedDataModel import org.pathoplexus.backend.model.SubmitModel -import org.pathoplexus.backend.service.Accession import org.pathoplexus.backend.service.DatabaseService +import org.pathoplexus.backend.utils.Accession import org.pathoplexus.backend.utils.IteratorStreamer import org.springframework.http.HttpHeaders import org.springframework.http.HttpStatus @@ -39,6 +39,7 @@ import org.springframework.web.bind.annotation.ResponseStatus import org.springframework.web.bind.annotation.RestController import org.springframework.web.multipart.MultipartFile import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody +import java.util.UUID import io.swagger.v3.oas.annotations.parameters.RequestBody as SwaggerRequestBody @RestController @@ -61,9 +62,13 @@ class SubmissionController( @UsernameFromJwt username: String, @Parameter(description = METADATA_FILE_DESCRIPTION) @RequestParam metadataFile: MultipartFile, @Parameter(description = SEQUENCE_FILE_DESCRIPTION) @RequestParam sequenceFile: MultipartFile, - ): List { - return submitModel.processSubmission(username, metadataFile, sequenceFile, organism) - } + ): List = submitModel.processSubmission( + UUID.randomUUID().toString(), + metadataFile, + sequenceFile, + username, + organism, + ) @Operation(description = EXTRACT_UNPROCESSED_DATA_DESCRIPTION) @ApiResponse( diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/controller/SubmissionControllerDescriptions.kt b/backend/src/main/kotlin/org/pathoplexus/backend/controller/SubmissionControllerDescriptions.kt index 2d3045097..a6d8cb544 100644 --- a/backend/src/main/kotlin/org/pathoplexus/backend/controller/SubmissionControllerDescriptions.kt +++ b/backend/src/main/kotlin/org/pathoplexus/backend/controller/SubmissionControllerDescriptions.kt @@ -9,13 +9,15 @@ You can use this response to associate the user provided submissionId with the s """ const val METADATA_FILE_DESCRIPTION = """ -A TSV (tab separated values) file containing the metadata of the submitted sequence entries. +A TSV (tab separated values) file containing the metadata of the submitted sequence entries. +The file may be compressed with zstd, xz, zip, gzip, lzma, bzip2 (with common extensions). It must contain the column names. The field 'submissionId' is required and must be unique within the provided dataset. It is used to associate metadata to the sequences in the sequences fasta file. """ const val SEQUENCE_FILE_DESCRIPTION = """ A fasta file containing the unaligned nucleotide sequences of the submitted sequences. +The file may be compressed with zstd, xz, zip, gzip, lzma, bzip2 (with common extensions). If the underlying organism has a single segment, the headers of the fasta file must match the 'submissionId' field in the metadata file. If the underlying organism has multiple segments, diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/model/ReleasedDataModel.kt b/backend/src/main/kotlin/org/pathoplexus/backend/model/ReleasedDataModel.kt index 9fd24af43..3e6f21901 100644 --- a/backend/src/main/kotlin/org/pathoplexus/backend/model/ReleasedDataModel.kt +++ b/backend/src/main/kotlin/org/pathoplexus/backend/model/ReleasedDataModel.kt @@ -6,10 +6,10 @@ import mu.KotlinLogging import org.pathoplexus.backend.api.Organism import org.pathoplexus.backend.api.ProcessedData import org.pathoplexus.backend.api.SiloVersionStatus -import org.pathoplexus.backend.service.Accession import org.pathoplexus.backend.service.DatabaseService import org.pathoplexus.backend.service.RawProcessedData -import org.pathoplexus.backend.service.Version +import org.pathoplexus.backend.utils.Accession +import org.pathoplexus.backend.utils.Version import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/model/SubmitModel.kt b/backend/src/main/kotlin/org/pathoplexus/backend/model/SubmitModel.kt index 6d1347086..e7906d703 100644 --- a/backend/src/main/kotlin/org/pathoplexus/backend/model/SubmitModel.kt +++ b/backend/src/main/kotlin/org/pathoplexus/backend/model/SubmitModel.kt @@ -1,39 +1,229 @@ package org.pathoplexus.backend.model +import kotlinx.datetime.Clock +import kotlinx.datetime.TimeZone +import kotlinx.datetime.toLocalDateTime +import mu.KotlinLogging +import org.apache.commons.compress.archivers.zip.ZipFile +import org.apache.commons.compress.compressors.CompressorStreamFactory import org.apache.commons.csv.CSVFormat import org.apache.commons.csv.CSVParser +import org.jetbrains.exposed.exceptions.ExposedSQLException import org.pathoplexus.backend.api.Organism import org.pathoplexus.backend.api.OriginalData import org.pathoplexus.backend.api.RevisedData import org.pathoplexus.backend.api.SubmissionIdMapping -import org.pathoplexus.backend.api.SubmittedData -import org.pathoplexus.backend.config.ReferenceGenome import org.pathoplexus.backend.controller.BadRequestException +import org.pathoplexus.backend.controller.DuplicateKeyException import org.pathoplexus.backend.controller.UnprocessableEntityException -import org.pathoplexus.backend.service.Accession +import org.pathoplexus.backend.service.CompressionAlgorithm import org.pathoplexus.backend.service.DatabaseService +import org.pathoplexus.backend.service.UploadDatabaseService +import org.pathoplexus.backend.utils.Accession import org.pathoplexus.backend.utils.FastaReader +import org.pathoplexus.backend.utils.ParseFastaHeader +import org.pathoplexus.backend.utils.metadataEntryStreamAsSequence import org.springframework.stereotype.Service import org.springframework.web.multipart.MultipartFile +import java.io.BufferedInputStream +import java.io.File +import java.io.InputStream import java.io.InputStreamReader const val HEADER_TO_CONNECT_METADATA_AND_SEQUENCES = "submissionId" private const val ACCESSION_HEADER = "accession" +private val log = KotlinLogging.logger { } typealias SubmissionId = String typealias SegmentName = String +typealias MetadataMap = Map> + +const val UNIQUE_CONSTRAINT_VIOLATION_SQL_STATE = "23505" @Service -class SubmitModel(private val databaseService: DatabaseService, private val referenceGenome: ReferenceGenome) { +class SubmitModel( + private val databaseService: DatabaseService, + private val uploadDatabaseService: UploadDatabaseService, + private val parseFastaHeader: ParseFastaHeader, +) { + + companion object AcceptedFileTypes { + val metadataFile = ValidExtension("Metadata file", listOf("tsv")) + val sequenceFile = ValidExtension("Sequence file", listOf("fasta")) + } + + data class ValidExtension( + val displayName: String, + val validExtensions: List, + ) { + fun getCompressedExtensions(): Map> = + CompressionAlgorithm.entries.associateWith { algorithm -> + validExtensions.map { + it + algorithm.extension + } + } + } + fun processSubmission( - username: String, + uploadId: String, metadataFile: MultipartFile, sequenceFile: MultipartFile, + submitter: String, organism: Organism, + batchSize: Int = 1000, ): List { - val submittedData = processSubmittedFiles(metadataFile, sequenceFile) + return try { + log.info { "Processing submission with uploadId $uploadId" } + uploadData(submitter, uploadId, metadataFile, sequenceFile, batchSize, organism) + + log.info { "Validating submission with uploadId $uploadId" } + val (metadataSubmissionIds, sequencesSubmissionIds) = uploadDatabaseService.getUploadSubmissionIds(uploadId) + validateSubmissionIdSets(metadataSubmissionIds.toSet(), sequencesSubmissionIds.toSet()) + + log.info { "Persisting submission with uploadId $uploadId" } + uploadDatabaseService.mapAndCopy(uploadId) + } finally { + uploadDatabaseService.deleteUploadData(uploadId) + } + } + + private fun uploadData( + submitter: String, + uploadId: String, + metadataMultipartFile: MultipartFile, + sequenceMultipartFile: MultipartFile, + batchSize: Int, + organism: Organism, + ) { + val metadataTempFileToDelete = MaybeFile() + val metadataStream = getStreamFromFile(metadataMultipartFile, uploadId, metadataFile, metadataTempFileToDelete) + try { + uploadMetadata(submitter, uploadId, metadataStream, batchSize, organism) + } finally { + metadataTempFileToDelete.delete() + } + + val sequenceTempFileToDelete = MaybeFile() + try { + val sequenceStream = getStreamFromFile( + sequenceMultipartFile, + uploadId, + sequenceFile, + sequenceTempFileToDelete, + ) + uploadSequences(uploadId, sequenceStream, batchSize, organism) + } finally { + sequenceTempFileToDelete.delete() + } + } + + class MaybeFile { + var file: File? = null + fun delete() { + file?.delete() + } + } + + private fun getStreamFromFile( + file: MultipartFile, + uploadId: String, + dataType: ValidExtension, + maybeFileToDelete: MaybeFile, + ): InputStream = when (getFileType(file, dataType)) { + CompressionAlgorithm.ZIP -> { + val tempFile = File.createTempFile( + "upload_" + dataType.displayName.replace(" ", ""), + uploadId, + ) + maybeFileToDelete.file = tempFile + + file.transferTo(tempFile) + val zipFile = ZipFile(tempFile) + BufferedInputStream(zipFile.getInputStream(zipFile.entries.nextElement())) + } + + CompressionAlgorithm.NONE -> + BufferedInputStream(file.inputStream) + + else -> + CompressorStreamFactory().createCompressorInputStream( + BufferedInputStream(file.inputStream), + ) + } - return databaseService.insertSubmissions(username, submittedData, organism) + private fun uploadMetadata( + submitter: String, + uploadId: String, + metadataStream: InputStream, + batchSize: Int, + organism: Organism, + ) { + log.info { + "intermediate storing uploaded metadata from $submitter with UploadId $uploadId" + } + val now = Clock.System.now().toLocalDateTime(TimeZone.UTC) + try { + metadataEntryStreamAsSequence(metadataStream).chunked(batchSize).forEach { batch -> + uploadDatabaseService.batchInsertMetadataInAuxTable( + submitter, + uploadId, + organism, + batch, + now, + ) + } + } catch (e: ExposedSQLException) { + if (e.sqlState == UNIQUE_CONSTRAINT_VIOLATION_SQL_STATE) { + throw DuplicateKeyException( + "Metadata file contains at least one duplicate submissionId: ${e.cause?.cause}", + ) + } + throw e + } + } + + private fun uploadSequences( + uploadId: String, + sequenceStream: InputStream, + batchSize: Int, + organism: Organism, + ) { + log.info { + "intermediate storing uploaded sequence data with UploadId $uploadId" + } + FastaReader(sequenceStream).asSequence().chunked(batchSize).forEach { batch -> + try { + uploadDatabaseService.batchInsertSequencesInAuxTable( + uploadId, + organism, + batch, + ) + } catch (e: ExposedSQLException) { + if (e.sqlState == UNIQUE_CONSTRAINT_VIOLATION_SQL_STATE) { + throw DuplicateKeyException( + "Sequence file contains at least one duplicate submissionId: ${e.cause?.cause}", + ) + } + throw e + } + } + } + + private fun getFileType(file: MultipartFile, expectedFileType: ValidExtension): CompressionAlgorithm { + val originalFilename = file.originalFilename + ?: throw BadRequestException("${expectedFileType.displayName} file missing") + + expectedFileType.getCompressedExtensions().forEach { (algorithm, extensions) -> + if (extensions.any { originalFilename.endsWith(it) }) { + return algorithm + } + } + + throw BadRequestException( + "${expectedFileType.displayName} has wrong extension. Must be " + + ".${expectedFileType.validExtensions} for uncompressed submissions or " + + ".${expectedFileType.getCompressedExtensions()} for compressed submissions", + ) } fun processRevision( @@ -47,25 +237,16 @@ class SubmitModel(private val databaseService: DatabaseService, private val refe return databaseService.reviseData(username, revisedData, organism) } - private fun processSubmittedFiles( - metadataFile: MultipartFile, - sequenceFile: MultipartFile, - ): List { - val metadataMap = metadataMap(metadataFile) - val sequenceMap = sequenceMap(sequenceFile) - - validateHeaders(metadataMap, sequenceMap) - - return metadataMap.map { entry -> - SubmittedData(entry.key, OriginalData(entry.value, sequenceMap[entry.key]!!)) - } - } - + // TODO(#604): adapt revisions to the new flow private fun processRevisedData( metadataFile: MultipartFile, sequenceFile: MultipartFile, ): List { - val metadataMap = metadataMap(metadataFile) + if (metadataFile.originalFilename == null || !metadataFile.originalFilename?.endsWith(".tsv")!!) { + throw BadRequestException("Metadata file must have extension .tsv") + } + + val metadataMap = createMetadataMap(metadataFile.inputStream) val metadataMapWithoutAccession = metadataMap.mapValues { it.value.filterKeys { column -> column != ACCESSION_HEADER } } val sequenceMap = sequenceMap(sequenceFile) @@ -83,35 +264,35 @@ class SubmitModel(private val databaseService: DatabaseService, private val refe } private fun validateHeaders( - metadataMap: Map>, - sequenceMap: Map>, + metadataMap: Map, + sequenceMap: Map, ) { val metadataKeysSet = metadataMap.keys.toSet() val sequenceKeysSet = sequenceMap.keys.toSet() + validateSubmissionIdSets(metadataKeysSet, sequenceKeysSet) + } + + private fun validateSubmissionIdSets(metadataKeysSet: Set, sequenceKeysSet: Set) { val metadataKeysNotInSequences = metadataKeysSet.subtract(sequenceKeysSet) if (metadataKeysNotInSequences.isNotEmpty()) { throw UnprocessableEntityException( - "Metadata file contains submissionIds that are not present in the sequence file: " + - metadataKeysNotInSequences, + "Metadata file contains ${metadataKeysNotInSequences.size} submissionIds that are not present " + + "in the sequence file: " + metadataKeysNotInSequences.toList().joinToString(limit = 10), ) } val sequenceKeysNotInMetadata = sequenceKeysSet.subtract(metadataKeysSet) if (sequenceKeysNotInMetadata.isNotEmpty()) { throw UnprocessableEntityException( - "Sequence file contains submissionIds that are not present in the metadata file: " + - sequenceKeysNotInMetadata, + "Sequence file contains ${sequenceKeysNotInMetadata.size} submissionIds that are not present " + + "in the metadata file: " + sequenceKeysNotInMetadata.toList().joinToString(limit = 10), ) } } - private fun metadataMap(metadataFile: MultipartFile): Map> { - if (metadataFile.originalFilename == null || !metadataFile.originalFilename?.endsWith(".tsv")!!) { - throw BadRequestException("Metadata file must have extension .tsv") - } - + private fun createMetadataMap(metadataInputStream: InputStream): MetadataMap { val csvParser = CSVParser( - InputStreamReader(metadataFile.inputStream), + InputStreamReader(metadataInputStream), CSVFormat.TDF.builder().setHeader().setSkipHeaderRecord(true).build(), ) @@ -141,6 +322,7 @@ class SubmitModel(private val databaseService: DatabaseService, private val refe .filter { it.value > 1 } .keys .sortedBy { it } + .joinToString(limit = 10) throw UnprocessableEntityException( "Metadata file contains duplicate ${HEADER_TO_CONNECT_METADATA_AND_SEQUENCES}s: $duplicateKeys", @@ -180,8 +362,8 @@ class SubmitModel(private val databaseService: DatabaseService, private val refe val fastaList = FastaReader(sequenceFile.bytes.inputStream()).toList() val sequenceMap = mutableMapOf>() fastaList.forEach { - val (sampleName, segmentName) = parseFastaHeader(it.sampleName) - val segmentMap = sequenceMap.getOrPut(sampleName) { mutableMapOf() } + val (submissionId, segmentName) = parseFastaHeader.parse(it.sampleName) + val segmentMap = sequenceMap.getOrPut(submissionId) { mutableMapOf() } if (segmentMap.containsKey(segmentName)) { throw UnprocessableEntityException("Sequence file contains duplicate submissionIds: ${it.sampleName}") } @@ -189,19 +371,4 @@ class SubmitModel(private val databaseService: DatabaseService, private val refe } return sequenceMap } - - private fun parseFastaHeader(submissionId: String): Pair { - if (referenceGenome.nucleotideSequences.size == 1) { - return Pair(submissionId, "main") - } - - val lastDelimiter = submissionId.lastIndexOf("_") - if (lastDelimiter == -1) { - throw BadRequestException( - "The FASTA header $submissionId does not contain the segment name. Please provide the" + - " segment name in the format _", - ) - } - return Pair(submissionId.substring(0, lastDelimiter), submissionId.substring(lastDelimiter + 1)) - } } diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/service/CompressionService.kt b/backend/src/main/kotlin/org/pathoplexus/backend/service/CompressionService.kt new file mode 100644 index 000000000..313997ca8 --- /dev/null +++ b/backend/src/main/kotlin/org/pathoplexus/backend/service/CompressionService.kt @@ -0,0 +1,111 @@ +package org.pathoplexus.backend.service + +import com.github.luben.zstd.Zstd +import org.pathoplexus.backend.api.Organism +import org.pathoplexus.backend.api.OriginalData +import org.pathoplexus.backend.config.BackendConfig +import org.springframework.stereotype.Service +import java.nio.charset.StandardCharsets +import java.util.Base64 + +enum class CompressionAlgorithm(val extension: String) { + NONE(""), + ZSTD(".zst"), + XZ(".xz"), + GZIP(".gz"), + ZIP(".zip"), + BZIP2(".bz2"), + LZMA(".lzma"), +} + +@Service +class CompressionService(private val backendConfig: BackendConfig) { + + fun compressUnalignedNucleotideSequence( + uncompressedSequence: String, + segmentName: String, + organism: Organism, + ): String = + compress( + uncompressedSequence, + getDictionaryForNucleotideSequenceSegments(segmentName, organism), + ) + + private fun decompressUnalignedNucleotideSequence( + compressedSequence: String, + segmentName: String, + organism: Organism, + ): String = decompress( + compressedSequence, + getDictionaryForNucleotideSequenceSegments(segmentName, organism), + ) + + fun decompressSequencesInOriginalData(originalData: OriginalData, organism: Organism) = + OriginalData( + originalData.metadata, + originalData + .unalignedNucleotideSequences.mapValues { + decompressUnalignedNucleotideSequence( + it.value, + it.key, + organism, + ) + }, + ) + + fun compressSequencesInOriginalData(originalData: OriginalData, organism: Organism) = + OriginalData( + originalData.metadata, + originalData + .unalignedNucleotideSequences.mapValues { (segmentName, sequenceData) -> + compressUnalignedNucleotideSequence( + sequenceData, + segmentName, + organism, + ) + }, + ) + + private fun compress(seq: String, dictionary: ByteArray?): String { + val input = seq.toByteArray(StandardCharsets.UTF_8) + val compressBound = Zstd.compressBound(input.size.toLong()).toInt() + val outputBuffer = ByteArray(compressBound) + + val compressionReturnCode: Long = if (dictionary == null) { + Zstd.compress(outputBuffer, input, 3) + } else { + Zstd.compress(outputBuffer, input, dictionary, 3) + } + + if (Zstd.isError(compressionReturnCode)) { + throw RuntimeException("Zstd compression failed: error code $compressionReturnCode") + } + + return Base64.getEncoder().encodeToString(outputBuffer.copyOfRange(0, compressionReturnCode.toInt())) + } + + private fun decompress(compressedSequenceString: String, dictionary: ByteArray?): String { + val compressed = Base64.getDecoder().decode(compressedSequenceString) + val decompressedSize = Zstd.decompressedSize(compressed).toInt() + val decompressedBuffer = ByteArray(decompressedSize) + val decompressionReturnCode: Long = if (dictionary == null) { + Zstd.decompress(decompressedBuffer, compressed) + } else { + Zstd.decompress(decompressedBuffer, compressed, dictionary) + } + if (Zstd.isError(decompressionReturnCode)) { + throw RuntimeException("Zstd decompression failed: error code $decompressionReturnCode") + } + return String(decompressedBuffer, 0, decompressionReturnCode.toInt(), StandardCharsets.UTF_8) + } + + private fun getDictionaryForNucleotideSequenceSegments( + segmentName: String, + organism: Organism, + ): ByteArray? = backendConfig + .getInstanceConfig(organism) + .referenceGenomes + .getNucleotideSegmentReference( + segmentName, + )?.toByteArray() +} diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/service/DatabaseService.kt b/backend/src/main/kotlin/org/pathoplexus/backend/service/DatabaseService.kt index 5f9c5d773..fc0af57e7 100644 --- a/backend/src/main/kotlin/org/pathoplexus/backend/service/DatabaseService.kt +++ b/backend/src/main/kotlin/org/pathoplexus/backend/service/DatabaseService.kt @@ -9,7 +9,6 @@ import kotlinx.datetime.TimeZone import kotlinx.datetime.toLocalDateTime import mu.KotlinLogging import org.jetbrains.exposed.sql.Database -import org.jetbrains.exposed.sql.NextVal import org.jetbrains.exposed.sql.QueryParameter import org.jetbrains.exposed.sql.SqlExpressionBuilder.plus import org.jetbrains.exposed.sql.and @@ -18,7 +17,6 @@ import org.jetbrains.exposed.sql.deleteWhere import org.jetbrains.exposed.sql.insert import org.jetbrains.exposed.sql.kotlin.datetime.dateTimeParam import org.jetbrains.exposed.sql.max -import org.jetbrains.exposed.sql.nextLongVal import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.stringParam import org.jetbrains.exposed.sql.update @@ -37,7 +35,6 @@ import org.pathoplexus.backend.api.Status.HAS_ERRORS import org.pathoplexus.backend.api.Status.IN_PROCESSING import org.pathoplexus.backend.api.Status.RECEIVED import org.pathoplexus.backend.api.SubmissionIdMapping -import org.pathoplexus.backend.api.SubmittedData import org.pathoplexus.backend.api.SubmittedProcessedData import org.pathoplexus.backend.api.UnprocessedData import org.pathoplexus.backend.config.ReferenceGenome @@ -46,7 +43,9 @@ import org.pathoplexus.backend.controller.ForbiddenException import org.pathoplexus.backend.controller.NotFoundException import org.pathoplexus.backend.controller.ProcessingValidationException import org.pathoplexus.backend.controller.UnprocessableEntityException +import org.pathoplexus.backend.utils.Accession import org.pathoplexus.backend.utils.IteratorStreamer +import org.pathoplexus.backend.utils.Version import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional import java.io.BufferedReader @@ -66,74 +65,50 @@ class DatabaseService( pool: DataSource, private val referenceGenome: ReferenceGenome, private val iteratorStreamer: IteratorStreamer, + private val sequenceEntriesTableProvider: SequenceEntriesTableProvider, ) { + init { Database.connect(pool) } - fun insertSubmissions( - submitter: String, - submittedData: List, - submittedOrganism: Organism, - ): List { - log.info { "submitting ${submittedData.size} new sequence entries by $submitter" } - - val now = Clock.System.now().toLocalDateTime(TimeZone.UTC) - - return submittedData.map { data -> - val insert = SequenceEntriesTable.insert { - it[accession] = accessionSequence.nextLongVal() as NextVal - it[SequenceEntriesTable.submitter] = submitter - it[submittedAt] = now - it[version] = 1 - it[status] = RECEIVED.name - it[submissionId] = data.submissionId - it[originalData] = data.originalData - it[organism] = submittedOrganism.name - } - SubmissionIdMapping( - insert[SequenceEntriesTable.accession], - insert[SequenceEntriesTable.version], - data.submissionId, - ) - } - } - fun streamUnprocessedSubmissions(numberOfSequenceEntries: Int, outputStream: OutputStream, organism: Organism) { log.info { "streaming unprocessed submissions. Requested $numberOfSequenceEntries sequence entries." } - val sequenceEntryData = SequenceEntriesTable - .slice(SequenceEntriesTable.accession, SequenceEntriesTable.version, SequenceEntriesTable.originalData) - .select( - where = { statusIs(RECEIVED) and isMaxVersion and organismIs(organism) }, - ) - .limit(numberOfSequenceEntries) - .map { - UnprocessedData( - it[SequenceEntriesTable.accession], - it[SequenceEntriesTable.version], - it[SequenceEntriesTable.originalData]!!, + sequenceEntriesTableProvider.get(organism).let { table -> + val sequenceEntryData = table + .slice(table.accessionColumn, table.versionColumn, table.originalDataColumn) + .select( + where = { table.statusIs(RECEIVED) and table.isMaxVersion and table.organismIs(organism) }, ) + .limit(numberOfSequenceEntries) + .map { + UnprocessedData( + it[table.accessionColumn], + it[table.versionColumn], + it[table.originalDataColumn]!!, + ) + } + + log.info { + "streaming ${sequenceEntryData.size} of $numberOfSequenceEntries requested unprocessed submissions" } - log.info { - "streaming ${sequenceEntryData.size} of $numberOfSequenceEntries requested unprocessed submissions" - } - - updateStatusToProcessing(sequenceEntryData) + updateStatusToProcessing(sequenceEntryData, table) - iteratorStreamer.streamAsNdjson(sequenceEntryData, outputStream) + iteratorStreamer.streamAsNdjson(sequenceEntryData, outputStream) + } } - private fun updateStatusToProcessing(sequenceEntries: List) { + private fun updateStatusToProcessing(sequenceEntries: List, table: SequenceEntriesDataTable) { val now = Clock.System.now().toLocalDateTime(TimeZone.UTC) - SequenceEntriesTable - .update( - where = { accessionVersionIsIn(sequenceEntries) }, - ) { - it[status] = IN_PROCESSING.name - it[startedProcessingAt] = now - } + + table.update( + where = { table.accessionVersionIsIn(sequenceEntries) }, + ) { + it[statusColumn] = IN_PROCESSING.name + it[startedProcessingAtColumn] = now + } } fun updateProcessedData(inputStream: InputStream, organism: Organism) { @@ -149,7 +124,7 @@ class DatabaseService( val numInserted = insertProcessedDataWithStatus(submittedProcessedData, organism) if (numInserted != 1) { - throwInsertFailedException(submittedProcessedData) + throwInsertFailedException(submittedProcessedData, organism) } } } @@ -179,18 +154,20 @@ class DatabaseService( else -> HAS_ERRORS } - return SequenceEntriesTable.update( - where = { - accessionVersionEquals(submittedProcessedDataWithAllKeysForInsertions) and - statusIs(IN_PROCESSING) and - organismIs(organism) - }, - ) { - it[status] = newStatus.name - it[processedData] = submittedProcessedDataWithAllKeysForInsertions.data - it[errors] = submittedErrors - it[warnings] = submittedWarnings - it[finishedProcessingAt] = now + return sequenceEntriesTableProvider.get(organism).let { table -> + table.update( + where = { + table.accessionVersionEquals(submittedProcessedDataWithAllKeysForInsertions) and + table.statusIs(IN_PROCESSING) and + table.organismIs(organism) + }, + ) { + it[statusColumn] = newStatus.name + it[processedDataColumn] = submittedProcessedDataWithAllKeysForInsertions.data + it[errorsColumn] = submittedErrors + it[warningsColumn] = submittedWarnings + it[finishedProcessingAtColumn] = now + } } } @@ -198,15 +175,17 @@ class DatabaseService( submittedProcessedData: SubmittedProcessedData, organism: Organism, ) { - val resultRow = SequenceEntriesTable.slice(SequenceEntriesTable.organism) - .select(where = { accessionVersionEquals(submittedProcessedData) }) - .firstOrNull() ?: return - - if (resultRow[SequenceEntriesTable.organism] != organism.name) { - throw UnprocessableEntityException( - "Accession version ${submittedProcessedData.displayAccessionVersion()} is for organism " + - "${resultRow[SequenceEntriesTable.organism]}, but submitted data is for organism ${organism.name}", - ) + sequenceEntriesTableProvider.get(organism).let { table -> + val resultRow = table.slice(table.organismColumn) + .select(where = { table.accessionVersionEquals(submittedProcessedData) }) + .firstOrNull() ?: return + + if (resultRow[table.organismColumn] != organism.name) { + throw UnprocessableEntityException( + "Accession version ${submittedProcessedData.displayAccessionVersion()} is for organism " + + "${resultRow[table.organismColumn]}, but submitted data is for organism ${organism.name}", + ) + } } } @@ -237,29 +216,31 @@ class DatabaseService( ) } - private fun throwInsertFailedException(submittedProcessedData: SubmittedProcessedData): String { - val selectedSequenceEntries = SequenceEntriesTable - .slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.status, - ) - .select(where = { accessionVersionEquals(submittedProcessedData) }) + private fun throwInsertFailedException(submittedProcessedData: SubmittedProcessedData, organism: Organism): String { + sequenceEntriesTableProvider.get(organism).let { table -> + val selectedSequenceEntries = table + .slice( + table.accessionColumn, + table.versionColumn, + table.statusColumn, + ) + .select(where = { table.accessionVersionEquals(submittedProcessedData) }) - val accessionVersion = submittedProcessedData.displayAccessionVersion() - if (selectedSequenceEntries.count() == 0L) { - throw UnprocessableEntityException("Accession version $accessionVersion does not exist") - } + val accessionVersion = submittedProcessedData.displayAccessionVersion() + if (selectedSequenceEntries.count() == 0L) { + throw UnprocessableEntityException("Accession version $accessionVersion does not exist") + } - val selectedSequence = selectedSequenceEntries.first() - if (selectedSequence[SequenceEntriesTable.status] != IN_PROCESSING.name) { - throw UnprocessableEntityException( - "Accession version $accessionVersion is in not in state $IN_PROCESSING " + - "(was ${selectedSequence[SequenceEntriesTable.status]})", - ) - } + val selectedSequence = selectedSequenceEntries.first() + if (selectedSequence[table.statusColumn] != IN_PROCESSING.name) { + throw UnprocessableEntityException( + "Accession version $accessionVersion is in not in state $IN_PROCESSING " + + "(was ${selectedSequence[table.statusColumn]})", + ) + } - throw RuntimeException("Update processed data: Unexpected error for accession versions $accessionVersion") + throw RuntimeException("Update processed data: Unexpected error for accession versions $accessionVersion") + } } fun approveProcessedData(submitter: String, accessionVersions: List, organism: Organism) { @@ -272,69 +253,76 @@ class DatabaseService( organism, ) - SequenceEntriesTable.update( - where = { - accessionVersionIsIn(accessionVersions) and statusIs(AWAITING_APPROVAL) - }, - ) { - it[status] = APPROVED_FOR_RELEASE.name + sequenceEntriesTableProvider.get(organism).let { table -> + table.update( + where = { + table.accessionVersionIsIn(accessionVersions) and table.statusIs(AWAITING_APPROVAL) + }, + ) { + it[statusColumn] = APPROVED_FOR_RELEASE.name + } } } fun getLatestVersions(organism: Organism): Map { - val maxVersionExpression = SequenceEntriesTable.version.max() - return SequenceEntriesTable - .slice(SequenceEntriesTable.accession, maxVersionExpression) - .select( - where = { statusIs(APPROVED_FOR_RELEASE) and organismIs(organism) }, - ) - .groupBy(SequenceEntriesTable.accession) - .associate { it[SequenceEntriesTable.accession] to it[maxVersionExpression]!! } + sequenceEntriesTableProvider.get(organism).let { table -> + val maxVersionExpression = table.versionColumn.max() + return table + .slice(table.accessionColumn, maxVersionExpression) + .select( + where = { table.statusIs(APPROVED_FOR_RELEASE) and table.organismIs(organism) }, + ) + .groupBy(table.accessionColumn) + .associate { it[table.accessionColumn] to it[maxVersionExpression]!! } + } } fun getLatestRevocationVersions(organism: Organism): Map { - val maxVersionExpression = SequenceEntriesTable.version.max() - return SequenceEntriesTable - .slice(SequenceEntriesTable.accession, maxVersionExpression) - .select( - where = { - statusIs(APPROVED_FOR_RELEASE) and - (SequenceEntriesTable.isRevocation eq true) and - organismIs(organism) - }, - ) - .groupBy(SequenceEntriesTable.accession) - .associate { it[SequenceEntriesTable.accession] to it[maxVersionExpression]!! } + sequenceEntriesTableProvider.get(organism).let { table -> + val maxVersionExpression = table.versionColumn.max() + return table + .slice(table.accessionColumn, maxVersionExpression) + .select( + where = { + table.statusIs(APPROVED_FOR_RELEASE) and + (table.isRevocationColumn eq true) and + table.organismIs(organism) + }, + ) + .groupBy(table.accessionColumn) + .associate { it[table.accessionColumn] to it[maxVersionExpression]!! } + } } fun streamReleasedSubmissions(organism: Organism): Sequence { - return SequenceEntriesTable - .slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.isRevocation, - SequenceEntriesTable.processedData, - SequenceEntriesTable.submitter, - SequenceEntriesTable.submittedAt, - SequenceEntriesTable.submissionId, - ) - .select( - where = { statusIs(APPROVED_FOR_RELEASE) and organismIs(organism) }, + return sequenceEntriesTableProvider.get(organism).let { table -> + table.slice( + table.accessionColumn, + table.versionColumn, + table.isRevocationColumn, + table.processedDataColumn, + table.submitterColumn, + table.submittedAtColumn, + table.submissionIdColumn, ) - // TODO(#429): This needs clarification of how to handle revocations. Until then, revocations are filtered out. - .filter { !it[SequenceEntriesTable.isRevocation] } - .map { - RawProcessedData( - accession = it[SequenceEntriesTable.accession], - version = it[SequenceEntriesTable.version], - isRevocation = it[SequenceEntriesTable.isRevocation], - submitter = it[SequenceEntriesTable.submitter], - submissionId = it[SequenceEntriesTable.submissionId], - processedData = it[SequenceEntriesTable.processedData]!!, - submittedAt = it[SequenceEntriesTable.submittedAt], + .select( + where = { table.statusIs(APPROVED_FOR_RELEASE) and table.organismIs(organism) }, ) - } - .asSequence() + // TODO(#429): This needs clarification of how to handle revocations. Until then, revocations are filtered out. + .filter { !it[table.isRevocationColumn] } + .map { + RawProcessedData( + accession = it[table.accessionColumn], + version = it[table.versionColumn], + isRevocation = it[table.isRevocationColumn], + submitter = it[table.submitterColumn], + submissionId = it[table.submissionIdColumn], + processedData = it[table.processedDataColumn]!!, + submittedAt = it[table.submittedAtColumn], + ) + } + .asSequence() + } } fun streamDataToEdit( @@ -344,34 +332,35 @@ class DatabaseService( organism: Organism, ) { log.info { "streaming $numberOfSequenceEntries submissions that need edit by $submitter" } - val sequencesData = SequenceEntriesTable - .slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.status, - SequenceEntriesTable.processedData, - SequenceEntriesTable.originalData, - SequenceEntriesTable.errors, - SequenceEntriesTable.warnings, + val sequencesData = sequenceEntriesTableProvider.get(organism).let { table -> + table.slice( + table.accessionColumn, + table.versionColumn, + table.statusColumn, + table.processedDataColumn, + table.originalDataColumn, + table.errorsColumn, + table.warningsColumn, ) - .select( - where = { - statusIs(HAS_ERRORS) and - isMaxVersion and - submitterIs(submitter) and - organismIs(organism) - }, - ).limit(numberOfSequenceEntries).map { row -> - SequenceEntryVersionToEdit( - row[SequenceEntriesTable.accession], - row[SequenceEntriesTable.version], - Status.fromString(row[SequenceEntriesTable.status]), - row[SequenceEntriesTable.processedData]!!, - row[SequenceEntriesTable.originalData]!!, - row[SequenceEntriesTable.errors], - row[SequenceEntriesTable.warnings], - ) - } + .select( + where = { + table.statusIs(HAS_ERRORS) and + table.isMaxVersion and + table.submitterIs(submitter) and + table.organismIs(organism) + }, + ).limit(numberOfSequenceEntries).map { row -> + SequenceEntryVersionToEdit( + row[table.accessionColumn], + row[table.versionColumn], + Status.fromString(row[table.statusColumn]), + row[table.processedDataColumn]!!, + row[table.originalDataColumn]!!, + row[table.errorsColumn], + row[table.warningsColumn], + ) + } + } iteratorStreamer.streamAsNdjson(sequencesData, outputStream) } @@ -379,49 +368,51 @@ class DatabaseService( fun getActiveSequencesSubmittedBy(username: String, organism: Organism): List { log.info { "getting active sequence entries submitted by $username" } - val subTableSequenceStatus = SequenceEntriesTable - .slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.status, - SequenceEntriesTable.isRevocation, - SequenceEntriesTable.organism, - ) + sequenceEntriesTableProvider.get(organism).let { table -> + val subTableSequenceStatus = table + .slice( + table.accessionColumn, + table.versionColumn, + table.statusColumn, + table.isRevocationColumn, + table.organismColumn, + ) - val releasedSequenceEntries = subTableSequenceStatus - .select( + val releasedSequenceEntries = subTableSequenceStatus + .select( + where = { + table.statusIs(APPROVED_FOR_RELEASE) and + table.submitterIs(username) and + table.isMaxReleasedVersion and + table.organismIs(organism) + }, + ).map { row -> + SequenceEntryStatus( + row[table.accessionColumn], + row[table.versionColumn], + APPROVED_FOR_RELEASE, + row[table.isRevocationColumn], + ) + } + + val unreleasedSequenceEntries = subTableSequenceStatus.select( where = { - statusIs(APPROVED_FOR_RELEASE) and - submitterIs(username) and - isMaxReleasedVersion and - organismIs(organism) + (table.statusColumn neq APPROVED_FOR_RELEASE.name) and + table.submitterIs(username) and + table.isMaxVersion and + table.organismIs(organism) }, ).map { row -> SequenceEntryStatus( - row[SequenceEntriesTable.accession], - row[SequenceEntriesTable.version], - APPROVED_FOR_RELEASE, - row[SequenceEntriesTable.isRevocation], + row[table.accessionColumn], + row[table.versionColumn], + Status.fromString(row[table.statusColumn]), + row[table.isRevocationColumn], ) } - val unreleasedSequenceEntries = subTableSequenceStatus.select( - where = { - (SequenceEntriesTable.status neq APPROVED_FOR_RELEASE.name) and - submitterIs(username) and - isMaxVersion and - organismIs(organism) - }, - ).map { row -> - SequenceEntryStatus( - row[SequenceEntriesTable.accession], - row[SequenceEntriesTable.version], - Status.fromString(row[SequenceEntriesTable.status]), - row[SequenceEntriesTable.isRevocation], - ) + return releasedSequenceEntries + unreleasedSequenceEntries } - - return releasedSequenceEntries + unreleasedSequenceEntries } fun reviseData(submitter: String, revisedData: List, organism: Organism): List { @@ -438,37 +429,39 @@ class DatabaseService( ).associateBy { it.accession } revisedData.map { data -> - SequenceEntriesTable.insert( - SequenceEntriesTable.slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version.plus(1), - SequenceEntriesTable.submissionId, - SequenceEntriesTable.submitter, - dateTimeParam(now), - stringParam(RECEIVED.name), - booleanParam(false), - QueryParameter(data.originalData, SequenceEntriesTable.originalData.columnType), - SequenceEntriesTable.organism, - ).select( - where = { - (SequenceEntriesTable.accession eq data.accession) and - isMaxVersion and - statusIs(APPROVED_FOR_RELEASE) and - submitterIs(submitter) - }, - ), - columns = listOf( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.submissionId, - SequenceEntriesTable.submitter, - SequenceEntriesTable.submittedAt, - SequenceEntriesTable.status, - SequenceEntriesTable.isRevocation, - SequenceEntriesTable.originalData, - SequenceEntriesTable.organism, - ), - ) + sequenceEntriesTableProvider.get(organism).let { table -> + table.insert( + table.slice( + table.accessionColumn, + table.versionColumn.plus(1), + table.submissionIdColumn, + table.submitterColumn, + dateTimeParam(now), + stringParam(RECEIVED.name), + booleanParam(false), + QueryParameter(data.originalData, table.originalDataColumn.columnType), + table.organismColumn, + ).select( + where = { + (table.accessionColumn eq data.accession) and + table.isMaxVersion and + table.statusIs(APPROVED_FOR_RELEASE) and + table.submitterIs(submitter) + }, + ), + columns = listOf( + table.accessionColumn, + table.versionColumn, + table.submissionIdColumn, + table.submitterColumn, + table.submittedAtColumn, + table.statusColumn, + table.isRevocationColumn, + table.originalDataColumn, + table.organismColumn, + ), + ) + } } return revisedData.map { @@ -487,56 +480,57 @@ class DatabaseService( ) val now = Clock.System.now().toLocalDateTime(TimeZone.UTC) - - SequenceEntriesTable.insert( - SequenceEntriesTable.slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version.plus(1), - SequenceEntriesTable.submissionId, - SequenceEntriesTable.submitter, - dateTimeParam(now), - stringParam(AWAITING_APPROVAL_FOR_REVOCATION.name), - booleanParam(true), - SequenceEntriesTable.organism, - ).select( - where = { - (SequenceEntriesTable.accession inList accessions) and - isMaxVersion and - statusIs(APPROVED_FOR_RELEASE) - }, - ), - columns = listOf( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.submissionId, - SequenceEntriesTable.submitter, - SequenceEntriesTable.submittedAt, - SequenceEntriesTable.status, - SequenceEntriesTable.isRevocation, - SequenceEntriesTable.organism, - ), - ) - - return SequenceEntriesTable - .slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.isRevocation, + sequenceEntriesTableProvider.get(organism).let { table -> + table.insert( + table.slice( + table.accessionColumn, + table.versionColumn.plus(1), + table.submissionIdColumn, + table.submitterColumn, + dateTimeParam(now), + stringParam(AWAITING_APPROVAL_FOR_REVOCATION.name), + booleanParam(true), + table.organismColumn, + ).select( + where = { + (table.accessionColumn inList accessions) and + table.isMaxVersion and + table.statusIs(APPROVED_FOR_RELEASE) + }, + ), + columns = listOf( + table.accessionColumn, + table.versionColumn, + table.submissionIdColumn, + table.submitterColumn, + table.submittedAtColumn, + table.statusColumn, + table.isRevocationColumn, + table.organismColumn, + ), ) - .select( - where = { - (SequenceEntriesTable.accession inList accessions) and - isMaxVersion and - statusIs(AWAITING_APPROVAL_FOR_REVOCATION) - }, - ).map { - SequenceEntryStatus( - it[SequenceEntriesTable.accession], - it[SequenceEntriesTable.version], - AWAITING_APPROVAL_FOR_REVOCATION, - it[SequenceEntriesTable.isRevocation], + + return table + .slice( + table.accessionColumn, + table.versionColumn, + table.isRevocationColumn, ) - } + .select( + where = { + (table.accessionColumn inList accessions) and + table.isMaxVersion and + table.statusIs(AWAITING_APPROVAL_FOR_REVOCATION) + }, + ).map { + SequenceEntryStatus( + it[table.accessionColumn], + it[table.versionColumn], + AWAITING_APPROVAL_FOR_REVOCATION, + it[table.isRevocationColumn], + ) + } + } } fun confirmRevocation(accessionVersions: List, username: String, organism: Organism) { @@ -549,12 +543,16 @@ class DatabaseService( organism, ) - SequenceEntriesTable.update( - where = { - accessionVersionIsIn(accessionVersions) and statusIs(AWAITING_APPROVAL_FOR_REVOCATION) - }, - ) { - it[status] = APPROVED_FOR_RELEASE.name + sequenceEntriesTableProvider.get(organism).let { table -> + table.update( + where = { + table.accessionVersionIsIn(accessionVersions) and table.statusIs( + AWAITING_APPROVAL_FOR_REVOCATION, + ) + }, + ) { + it[statusColumn] = APPROVED_FOR_RELEASE.name + } } } @@ -568,7 +566,7 @@ class DatabaseService( organism, ) - SequenceEntriesTable.deleteWhere { + sequenceEntriesTableProvider.get(organism).deleteWhere { accessionVersionIsIn(accessionVersions) } } @@ -576,25 +574,27 @@ class DatabaseService( fun submitEditedData(submitter: String, editedAccessionVersion: UnprocessedData, organism: Organism) { log.info { "edited sequence entry submitted $editedAccessionVersion" } - val sequencesEdited = SequenceEntriesTable.update( - where = { - accessionVersionEquals(editedAccessionVersion) and - submitterIs(submitter) and - statusIsOneOf(AWAITING_APPROVAL, HAS_ERRORS) and - organismIs(organism) - }, - ) { - it[status] = RECEIVED.name - it[originalData] = editedAccessionVersion.data - it[errors] = null - it[warnings] = null - it[startedProcessingAt] = null - it[finishedProcessingAt] = null - it[processedData] = null - } + sequenceEntriesTableProvider.get(organism).let { table -> + val sequencesEdited = table.update( + where = { + table.accessionVersionEquals(editedAccessionVersion) and + table.submitterIs(submitter) and + table.statusIsOneOf(AWAITING_APPROVAL, HAS_ERRORS) and + table.organismIs(organism) + }, + ) { + it[statusColumn] = RECEIVED.name + it[originalDataColumn] = editedAccessionVersion.data + it[errorsColumn] = null + it[warningsColumn] = null + it[startedProcessingAtColumn] = null + it[finishedProcessingAtColumn] = null + it[processedDataColumn] = null + } - if (sequencesEdited != 1) { - handleEditedSubmissionError(editedAccessionVersion, submitter, organism) + if (sequencesEdited != 1) { + handleEditedSubmissionError(editedAccessionVersion, submitter, organism) + } } } @@ -603,48 +603,51 @@ class DatabaseService( submitter: String, organism: Organism, ) { - val selectedSequences = SequenceEntriesTable - .slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.status, - SequenceEntriesTable.submitter, - SequenceEntriesTable.organism, - ) - .select(where = { accessionVersionEquals(editedAccessionVersion) }) + sequenceEntriesTableProvider.get(organism).let { table -> + val selectedSequences = table + .slice( + table.accessionColumn, + table.versionColumn, + table.statusColumn, + table.submitterColumn, + table.organismColumn, + ) + .select(where = { table.accessionVersionEquals(editedAccessionVersion) }) - val accessionVersionString = editedAccessionVersion.displayAccessionVersion() + val accessionVersionString = editedAccessionVersion.displayAccessionVersion() - if (selectedSequences.count().toInt() == 0) { - throw UnprocessableEntityException("Sequence entry $accessionVersionString does not exist") - } + if (selectedSequences.count().toInt() == 0) { + throw UnprocessableEntityException("Sequence entry $accessionVersionString does not exist") + } - val queriedSequence = selectedSequences.first() + val queriedSequence = selectedSequences.first() - val hasCorrectStatus = queriedSequence[SequenceEntriesTable.status] == AWAITING_APPROVAL.name || - queriedSequence[SequenceEntriesTable.status] == HAS_ERRORS.name - if (!hasCorrectStatus) { - val status = queriedSequence[SequenceEntriesTable.status] - throw UnprocessableEntityException( - "Sequence entry $accessionVersionString is in status $status, not in $AWAITING_APPROVAL or $HAS_ERRORS", - ) - } + val hasCorrectStatus = queriedSequence[table.statusColumn] == AWAITING_APPROVAL.name || + queriedSequence[table.statusColumn] == HAS_ERRORS.name + if (!hasCorrectStatus) { + val status = queriedSequence[table.statusColumn] + throw UnprocessableEntityException( + "Sequence entry $accessionVersionString is in status $status, " + + "not in $AWAITING_APPROVAL or $HAS_ERRORS", + ) + } - if (queriedSequence[SequenceEntriesTable.submitter] != submitter) { - throw ForbiddenException( - "Sequence entry $accessionVersionString is not owned by user $submitter", - ) - } + if (queriedSequence[table.submitterColumn] != submitter) { + throw ForbiddenException( + "Sequence entry $accessionVersionString is not owned by user $submitter", + ) + } - if (queriedSequence[SequenceEntriesTable.organism] != organism.name) { - throw UnprocessableEntityException( - "Sequence entry $accessionVersionString is for organism " + - "${queriedSequence[SequenceEntriesTable.organism]}, but submitted data is for organism " + - organism.name, - ) - } + if (queriedSequence[table.organismColumn] != organism.name) { + throw UnprocessableEntityException( + "Sequence entry $accessionVersionString is for organism " + + "${queriedSequence[table.organismColumn]}, but submitted data is for organism " + + organism.name, + ) + } - throw Exception("SequenceEdit: Unknown error") + throw Exception("SequenceEdit: Unknown error") + } } fun getSequenceEntryVersionToEdit( @@ -656,39 +659,41 @@ class DatabaseService( "Getting sequence entry ${accessionVersion.displayAccessionVersion()} by $submitter to edit" } - val selectedSequenceEntries = SequenceEntriesTable - .slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.processedData, - SequenceEntriesTable.originalData, - SequenceEntriesTable.errors, - SequenceEntriesTable.warnings, - SequenceEntriesTable.status, - ) - .select( - where = { - statusIsOneOf(HAS_ERRORS, AWAITING_APPROVAL) and - accessionVersionEquals(accessionVersion) and - submitterIs(submitter) and - organismIs(organism) - }, + val dataTable = sequenceEntriesTableProvider.get(organism) + dataTable.let { table -> + val selectedSequenceEntries = table.slice( + table.accessionColumn, + table.versionColumn, + table.statusColumn, + table.processedDataColumn, + table.originalDataColumn, + table.errorsColumn, + table.warningsColumn, ) + .select( + where = { + table.statusIsOneOf(HAS_ERRORS, AWAITING_APPROVAL) and + table.accessionVersionEquals(accessionVersion) and + table.submitterIs(submitter) and + table.organismIs(organism) + }, + ) - if (selectedSequenceEntries.count().toInt() != 1) { - handleGetSequenceEntryVersionWithErrorsDataError(submitter, accessionVersion, organism) - } + if (selectedSequenceEntries.count().toInt() != 1) { + handleGetSequenceEntryVersionWithErrorsDataError(submitter, accessionVersion, organism) + } - return selectedSequenceEntries.first().let { - SequenceEntryVersionToEdit( - it[SequenceEntriesTable.accession], - it[SequenceEntriesTable.version], - Status.fromString(it[SequenceEntriesTable.status]), - it[SequenceEntriesTable.processedData]!!, - it[SequenceEntriesTable.originalData]!!, - it[SequenceEntriesTable.errors], - it[SequenceEntriesTable.warnings], - ) + return selectedSequenceEntries.first().let { + SequenceEntryVersionToEdit( + it[table.accessionColumn], + it[table.versionColumn], + Status.fromString(it[table.statusColumn]), + it[table.processedDataColumn]!!, + it[table.originalDataColumn]!!, + it[table.errorsColumn], + it[table.warningsColumn], + ) + } } } @@ -697,57 +702,65 @@ class DatabaseService( accessionVersion: AccessionVersion, organism: Organism, ): Nothing { - val selectedSequences = SequenceEntriesTable - .slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.status, - SequenceEntriesTable.organism, - ) - .select(where = { accessionVersionEquals(accessionVersion) }) + sequenceEntriesTableProvider.get(organism).let { table -> + val selectedSequences = table + .slice( + table.accessionColumn, + table.versionColumn, + table.statusColumn, + table.organismColumn, + ) + .select(where = { table.accessionVersionEquals(accessionVersion) }) - if (selectedSequences.count().toInt() == 0) { - throw NotFoundException("Accession version ${accessionVersion.displayAccessionVersion()} does not exist") - } + if (selectedSequences.count().toInt() == 0) { + throw NotFoundException( + "Accession version ${accessionVersion.displayAccessionVersion()} does not exist", + ) + } - val selectedSequence = selectedSequences.first() + val selectedSequence = selectedSequences.first() - val hasCorrectStatus = - (selectedSequence[SequenceEntriesTable.status] == AWAITING_APPROVAL.name) || - (selectedSequence[SequenceEntriesTable.status] == HAS_ERRORS.name) + val hasCorrectStatus = + (selectedSequence[table.statusColumn] == AWAITING_APPROVAL.name) || + (selectedSequence[table.statusColumn] == HAS_ERRORS.name) - if (!hasCorrectStatus) { - throw UnprocessableEntityException( - "Accession version ${accessionVersion.displayAccessionVersion()} is in not in state " + - "${HAS_ERRORS.name} or ${AWAITING_APPROVAL.name} " + - "(was ${selectedSequence[SequenceEntriesTable.status]})", - ) - } + if (!hasCorrectStatus) { + throw UnprocessableEntityException( + "Accession version ${accessionVersion.displayAccessionVersion()} is in not in state " + + "${HAS_ERRORS.name} or ${AWAITING_APPROVAL.name} " + + "(was ${selectedSequence[table.statusColumn]})", + ) + } - if (!hasPermissionToChange(submitter, accessionVersion)) { - throw ForbiddenException( - "Sequence entry ${accessionVersion.displayAccessionVersion()} is not owned by user $submitter", - ) - } + if (!hasPermissionToChange(submitter, accessionVersion, table)) { + throw ForbiddenException( + "Sequence entry ${accessionVersion.displayAccessionVersion()} is not owned by user $submitter", + ) + } + + if (selectedSequence[table.organismColumn] != organism.name) { + throw UnprocessableEntityException( + "Accession version ${accessionVersion.displayAccessionVersion()} is for organism " + + selectedSequence[table.organismColumn] + + ", but requested data for organism ${organism.name}", + ) + } - if (selectedSequence[SequenceEntriesTable.organism] != organism.name) { - throw UnprocessableEntityException( - "Accession version ${accessionVersion.displayAccessionVersion()} is for organism " + - selectedSequence[SequenceEntriesTable.organism] + - ", but requested data for organism ${organism.name}", + throw RuntimeException( + "Get edited data: Unexpected error for accession version ${accessionVersion.displayAccessionVersion()}", ) } - - throw RuntimeException( - "Get edited data: Unexpected error for accession version ${accessionVersion.displayAccessionVersion()}", - ) } - private fun hasPermissionToChange(user: String, accessionVersion: AccessionVersion): Boolean { - val sequencesOwnedByUser = SequenceEntriesTable - .slice(SequenceEntriesTable.accession, SequenceEntriesTable.version, SequenceEntriesTable.submitter) + private fun hasPermissionToChange( + user: String, + accessionVersion: AccessionVersion, + table: SequenceEntriesDataTable, + ): Boolean { + val sequencesOwnedByUser = table + .slice(table.accessionColumn, table.versionColumn, table.submitterColumn) .select( - where = { accessionVersionEquals(accessionVersion) and submitterIs(user) }, + where = { table.accessionVersionEquals(accessionVersion) and table.submitterIs(user) }, ) return sequencesOwnedByUser.count() == 1L diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/service/JacksonSerializableJsonb.kt b/backend/src/main/kotlin/org/pathoplexus/backend/service/JacksonSerializableJsonb.kt new file mode 100644 index 000000000..3715f0c8e --- /dev/null +++ b/backend/src/main/kotlin/org/pathoplexus/backend/service/JacksonSerializableJsonb.kt @@ -0,0 +1,14 @@ +package org.pathoplexus.backend.service + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.jetbrains.exposed.sql.Table +import org.jetbrains.exposed.sql.json.jsonb + +val jacksonObjectMapper: ObjectMapper = jacksonObjectMapper().findAndRegisterModules() +inline fun Table.jacksonSerializableJsonb(columnName: String) = jsonb( + columnName, + { value -> jacksonObjectMapper.writeValueAsString(value) }, + { string -> jacksonObjectMapper.readValue(string) }, +) diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/service/MetadataUploadAuxTable.kt b/backend/src/main/kotlin/org/pathoplexus/backend/service/MetadataUploadAuxTable.kt new file mode 100644 index 000000000..5cffb3d7f --- /dev/null +++ b/backend/src/main/kotlin/org/pathoplexus/backend/service/MetadataUploadAuxTable.kt @@ -0,0 +1,17 @@ +package org.pathoplexus.backend.service + +import org.jetbrains.exposed.sql.Table +import org.jetbrains.exposed.sql.kotlin.datetime.datetime + +const val METADATA_UPLOAD_TABLE_NAME = "metadata_upload_aux_table" + +object MetadataUploadAuxTable : Table(METADATA_UPLOAD_TABLE_NAME) { + val uploadIdColumn = varchar("upload_id", 255) + val organismColumn = varchar("organism", 255) + val submissionIdColumn = varchar("submission_id", 255) + val submitterColumn = varchar("submitter", 255) + val uploadedAtColumn = datetime("uploaded_at") + val metadataColumn = + jacksonSerializableJsonb>("metadata").nullable() + override val primaryKey = PrimaryKey(uploadIdColumn, submissionIdColumn) +} diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/service/QueryPreconditionValidator.kt b/backend/src/main/kotlin/org/pathoplexus/backend/service/QueryPreconditionValidator.kt index a01146eba..fa769fec9 100644 --- a/backend/src/main/kotlin/org/pathoplexus/backend/service/QueryPreconditionValidator.kt +++ b/backend/src/main/kotlin/org/pathoplexus/backend/service/QueryPreconditionValidator.kt @@ -8,10 +8,15 @@ import org.pathoplexus.backend.api.Organism import org.pathoplexus.backend.api.Status import org.pathoplexus.backend.controller.ForbiddenException import org.pathoplexus.backend.controller.UnprocessableEntityException +import org.pathoplexus.backend.utils.Accession +import org.pathoplexus.backend.utils.AccessionComparator +import org.pathoplexus.backend.utils.AccessionVersionComparator import org.springframework.stereotype.Component @Component -class QueryPreconditionValidator { +class QueryPreconditionValidator( + private val sequenceEntriesTableProvider: SequenceEntriesTableProvider, +) { fun validateAccessionVersions( submitter: String, @@ -19,20 +24,22 @@ class QueryPreconditionValidator { statuses: List, organism: Organism, ) { - val sequenceEntries = SequenceEntriesTable - .slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.submitter, - SequenceEntriesTable.status, - SequenceEntriesTable.organism, - ) - .select(where = { accessionVersionIsIn(accessionVersions) }) + sequenceEntriesTableProvider.get(organism).let { table -> + val sequenceEntries = table + .slice( + table.accessionColumn, + table.versionColumn, + table.submitterColumn, + table.statusColumn, + table.organismColumn, + ) + .select(where = { table.accessionVersionIsIn(accessionVersions) }) - validateAccessionVersionsExist(sequenceEntries, accessionVersions) - validateSequenceEntriesAreInStates(sequenceEntries, statuses) - validateUserIsAllowedToEditSequenceEntries(sequenceEntries, submitter) - validateOrganism(sequenceEntries, organism) + validateAccessionVersionsExist(sequenceEntries, accessionVersions, table) + validateSequenceEntriesAreInStates(sequenceEntries, statuses, table) + validateUserIsAllowedToEditSequenceEntries(sequenceEntries, submitter, table) + validateOrganism(sequenceEntries, organism, table) + } } fun validateAccessions( @@ -41,32 +48,38 @@ class QueryPreconditionValidator { statuses: List, organism: Organism, ): List { - val sequenceEntries = SequenceEntriesTable - .slice( - SequenceEntriesTable.accession, - SequenceEntriesTable.version, - SequenceEntriesTable.submitter, - SequenceEntriesTable.status, - SequenceEntriesTable.organism, - ) - .select( - where = { (SequenceEntriesTable.accession inList accessions) and isMaxVersion }, - ) + sequenceEntriesTableProvider.get(organism).let { table -> + val sequenceEntries = table + .slice( + table.accessionColumn, + table.versionColumn, + table.submitterColumn, + table.statusColumn, + table.organismColumn, + ) + .select( + where = { (table.accessionColumn inList accessions) and table.isMaxVersion }, + ) - validateAccessionsExist(sequenceEntries, accessions) - validateSequenceEntriesAreInStates(sequenceEntries, statuses) - validateUserIsAllowedToEditSequenceEntries(sequenceEntries, submitter) - validateOrganism(sequenceEntries, organism) + validateAccessionsExist(sequenceEntries, accessions, table) + validateSequenceEntriesAreInStates(sequenceEntries, statuses, table) + validateUserIsAllowedToEditSequenceEntries(sequenceEntries, submitter, table) + validateOrganism(sequenceEntries, organism, table) - return sequenceEntries.map { - AccessionVersion( - it[SequenceEntriesTable.accession], - it[SequenceEntriesTable.version], - ) + return sequenceEntries.map { + AccessionVersion( + it[table.accessionColumn], + it[table.versionColumn], + ) + } } } - private fun validateAccessionVersionsExist(sequenceEntries: Query, accessionVersions: List) { + private fun validateAccessionVersionsExist( + sequenceEntries: Query, + accessionVersions: List, + table: SequenceEntriesDataTable, + ) { if (sequenceEntries.count() == accessionVersions.size.toLong()) { return } @@ -74,8 +87,8 @@ class QueryPreconditionValidator { val accessionVersionsNotFound = accessionVersions .filter { accessionVersion -> sequenceEntries.none { - it[SequenceEntriesTable.accession] == accessionVersion.accession && - it[SequenceEntriesTable.version] == accessionVersion.version + it[table.accessionColumn] == accessionVersion.accession && + it[table.versionColumn] == accessionVersion.version } } .sortedWith(AccessionVersionComparator) @@ -84,20 +97,24 @@ class QueryPreconditionValidator { throw UnprocessableEntityException("Accession versions $accessionVersionsNotFound do not exist") } - private fun validateSequenceEntriesAreInStates(sequenceEntries: Query, statuses: List) { + private fun validateSequenceEntriesAreInStates( + sequenceEntries: Query, + statuses: List, + table: SequenceEntriesDataTable, + ) { val sequenceEntriesNotInStatuses = sequenceEntries .filter { - statuses.none { status -> it[SequenceEntriesTable.status] == status.name } + statuses.none { status -> it[table.statusColumn] == status.name } } .sortedWith { left, right -> AccessionComparator.compare( - left[SequenceEntriesTable.accession], - right[SequenceEntriesTable.accession], + left[table.accessionColumn], + right[table.accessionColumn], ) } .map { - "${it[SequenceEntriesTable.accession]}.${it[SequenceEntriesTable.version]} - " + - it[SequenceEntriesTable.status] + "${it[table.accessionColumn]}.${it[table.versionColumn]} - " + + it[table.statusColumn] } if (sequenceEntriesNotInStatuses.isNotEmpty()) { @@ -108,10 +125,14 @@ class QueryPreconditionValidator { } } - private fun validateUserIsAllowedToEditSequenceEntries(sequenceEntries: Query, submitter: String) { + private fun validateUserIsAllowedToEditSequenceEntries( + sequenceEntries: Query, + submitter: String, + table: SequenceEntriesDataTable, + ) { val sequenceEntriesNotSubmittedByUser = sequenceEntries - .filter { it[SequenceEntriesTable.submitter] != submitter } - .map { AccessionVersion(it[SequenceEntriesTable.accession], it[SequenceEntriesTable.version]) } + .filter { it[table.submitterColumn] != submitter } + .map { AccessionVersion(it[table.accessionColumn], it[table.versionColumn]) } if (sequenceEntriesNotSubmittedByUser.isNotEmpty()) { val accessionVersionString = sequenceEntriesNotSubmittedByUser.sortedWith(AccessionVersionComparator) @@ -123,7 +144,11 @@ class QueryPreconditionValidator { } } - private fun validateAccessionsExist(sequenceEntries: Query, accessions: List) { + private fun validateAccessionsExist( + sequenceEntries: Query, + accessions: List, + table: SequenceEntriesDataTable, + ) { if (sequenceEntries.count() == accessions.size.toLong()) { return } @@ -131,7 +156,7 @@ class QueryPreconditionValidator { val accessionsNotFound = accessions .filter { accession -> sequenceEntries.none { - it[SequenceEntriesTable.accession] == accession + it[table.accessionColumn] == accession } } .sortedWith(AccessionComparator) @@ -140,12 +165,17 @@ class QueryPreconditionValidator { throw UnprocessableEntityException("Accessions $accessionsNotFound do not exist") } - private fun validateOrganism(sequenceEntryVersions: Query, organism: Organism) { + private fun validateOrganism(sequenceEntryVersions: Query, organism: Organism, table: SequenceEntriesDataTable) { val accessionVersionsByOtherOrganisms = - sequenceEntryVersions.filter { it[SequenceEntriesTable.organism] != organism.name } + sequenceEntryVersions.filter { it[table.organismColumn] != organism.name } .groupBy( - { it[SequenceEntriesTable.organism] }, - { AccessionVersion(it[SequenceEntriesTable.accession], it[SequenceEntriesTable.version]) }, + { it[table.organismColumn] }, + { + AccessionVersion( + it[table.accessionColumn], + it[table.versionColumn], + ) + }, ) if (accessionVersionsByOtherOrganisms.isEmpty()) { diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceEntriesTable.kt b/backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceEntriesTable.kt deleted file mode 100644 index d7a9a1965..000000000 --- a/backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceEntriesTable.kt +++ /dev/null @@ -1,115 +0,0 @@ -package org.pathoplexus.backend.service - -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.jetbrains.exposed.sql.Expression -import org.jetbrains.exposed.sql.Sequence -import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq -import org.jetbrains.exposed.sql.SqlExpressionBuilder.inList -import org.jetbrains.exposed.sql.Table -import org.jetbrains.exposed.sql.alias -import org.jetbrains.exposed.sql.and -import org.jetbrains.exposed.sql.json.jsonb -import org.jetbrains.exposed.sql.kotlin.datetime.datetime -import org.jetbrains.exposed.sql.max -import org.jetbrains.exposed.sql.select -import org.jetbrains.exposed.sql.wrapAsExpression -import org.pathoplexus.backend.api.AccessionVersionInterface -import org.pathoplexus.backend.api.Organism -import org.pathoplexus.backend.api.OriginalData -import org.pathoplexus.backend.api.PreprocessingAnnotation -import org.pathoplexus.backend.api.ProcessedData -import org.pathoplexus.backend.api.Status -import org.pathoplexus.backend.api.toPairs - -private val jacksonObjectMapper = jacksonObjectMapper().findAndRegisterModules() - -private inline fun Table.jacksonSerializableJsonb(columnName: String) = jsonb( - columnName, - { value -> jacksonObjectMapper.writeValueAsString(value) }, - { string -> jacksonObjectMapper.readValue(string) }, -) - -typealias Accession = String -typealias Version = Long - -object AccessionComparator : Comparator { - override fun compare(left: Accession, right: Accession): Int { - return left.toInt().compareTo(right.toInt()) - } -} - -object AccessionVersionComparator : Comparator { - override fun compare(left: AccessionVersionInterface, right: AccessionVersionInterface): Int { - return when (val accessionResult = left.accession.toInt().compareTo(right.accession.toInt())) { - 0 -> left.version.compareTo(right.version) - else -> accessionResult - } - } -} - -const val ACCESSION_SEQUENCE_NAME = "accession_sequence" - -val accessionSequence = Sequence(ACCESSION_SEQUENCE_NAME) - -const val TABLE_NAME = "sequence_entries" - -object SequenceEntriesTable : Table(TABLE_NAME) { - val accession = varchar("accession", 255) - val version = long("version") - val organism = varchar("organism", 255) - val submissionId = varchar("submission_id", 255) - val submitter = varchar("submitter", 255) - val submittedAt = datetime("submitted_at") - val startedProcessingAt = datetime("started_processing_at").nullable() - val finishedProcessingAt = datetime("finished_processing_at").nullable() - val status = varchar("status", 255) - val isRevocation = bool("is_revocation").default(false) - val originalData = - jacksonSerializableJsonb("original_data").nullable() - val processedData = jacksonSerializableJsonb("processed_data").nullable() - val errors = jacksonSerializableJsonb>("errors").nullable() - val warnings = jacksonSerializableJsonb>("warnings").nullable() - - override val primaryKey = PrimaryKey(accession, version) -} - -val isMaxVersion = SequenceEntriesTable.version eq maxVersionQuery() - -private fun maxVersionQuery(): Expression { - val subQueryTable = SequenceEntriesTable.alias("subQueryTable") - return wrapAsExpression( - subQueryTable - .slice(subQueryTable[SequenceEntriesTable.version].max()) - .select { subQueryTable[SequenceEntriesTable.accession] eq SequenceEntriesTable.accession }, - ) -} - -val isMaxReleasedVersion = SequenceEntriesTable.version eq maxReleasedVersionQuery() - -private fun maxReleasedVersionQuery(): Expression { - val subQueryTable = SequenceEntriesTable.alias("subQueryTable") - return wrapAsExpression( - subQueryTable - .slice(subQueryTable[SequenceEntriesTable.version].max()) - .select { - (subQueryTable[SequenceEntriesTable.accession] eq SequenceEntriesTable.accession) and - (subQueryTable[SequenceEntriesTable.status] eq Status.APPROVED_FOR_RELEASE.name) - }, - ) -} - -fun accessionVersionIsIn(accessionVersions: List) = - Pair(SequenceEntriesTable.accession, SequenceEntriesTable.version) inList accessionVersions.toPairs() - -fun organismIs(organism: Organism) = SequenceEntriesTable.organism eq organism.name - -fun statusIs(status: Status) = SequenceEntriesTable.status eq status.name - -fun statusIsOneOf(vararg status: Status) = SequenceEntriesTable.status inList status.map { it.name } - -fun accessionVersionEquals(accessionVersion: AccessionVersionInterface) = - (SequenceEntriesTable.accession eq accessionVersion.accession) and - (SequenceEntriesTable.version eq accessionVersion.version) - -fun submitterIs(submitter: String) = SequenceEntriesTable.submitter eq submitter diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceEntriesTableProvider.kt b/backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceEntriesTableProvider.kt new file mode 100644 index 000000000..c547f2b6d --- /dev/null +++ b/backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceEntriesTableProvider.kt @@ -0,0 +1,125 @@ +package org.pathoplexus.backend.service + +import com.fasterxml.jackson.module.kotlin.readValue +import org.jetbrains.exposed.sql.Column +import org.jetbrains.exposed.sql.Expression +import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq +import org.jetbrains.exposed.sql.SqlExpressionBuilder.inList +import org.jetbrains.exposed.sql.Table +import org.jetbrains.exposed.sql.alias +import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.json.jsonb +import org.jetbrains.exposed.sql.kotlin.datetime.datetime +import org.jetbrains.exposed.sql.max +import org.jetbrains.exposed.sql.select +import org.jetbrains.exposed.sql.wrapAsExpression +import org.pathoplexus.backend.api.AccessionVersionInterface +import org.pathoplexus.backend.api.Organism +import org.pathoplexus.backend.api.OriginalData +import org.pathoplexus.backend.api.PreprocessingAnnotation +import org.pathoplexus.backend.api.ProcessedData +import org.pathoplexus.backend.api.Status +import org.pathoplexus.backend.api.toPairs +import org.springframework.stereotype.Service + +@Service +class SequenceEntriesTableProvider(private val compressionService: CompressionService) { + + private val cachedTables: MutableMap = mutableMapOf() + + fun get(organism: Organism): SequenceEntriesDataTable { + return cachedTables.getOrPut(organism) { + SequenceEntriesDataTable(compressionService, organism) + } + } +} + +const val SEQUENCE_ENTRIES_TABLE_NAME = "sequence_entries" + +class SequenceEntriesDataTable( + compressionService: CompressionService, + organism: Organism, +) : Table( + SEQUENCE_ENTRIES_TABLE_NAME, +) { + val originalDataColumn = serializeOriginalData(compressionService, organism).nullable() + + val accessionColumn = varchar("accession", 255) + val versionColumn = long("version") + val organismColumn = varchar("organism", 255) + val submissionIdColumn = varchar("submission_id", 255) + val submitterColumn = varchar("submitter", 255) + val submittedAtColumn = datetime("submitted_at") + val startedProcessingAtColumn = datetime("started_processing_at").nullable() + val finishedProcessingAtColumn = datetime("finished_processing_at").nullable() + val statusColumn = varchar("status", 255) + val isRevocationColumn = bool("is_revocation").default(false) + val processedDataColumn = jacksonSerializableJsonb("processed_data").nullable() + val errorsColumn = jacksonSerializableJsonb>("errors").nullable() + val warningsColumn = jacksonSerializableJsonb>("warnings").nullable() + + override val primaryKey = PrimaryKey(accessionColumn, versionColumn) + + val isMaxVersion = versionColumn eq maxVersionQuery() + + private fun maxVersionQuery(): Expression { + val subQueryTable = alias("subQueryTable") + return wrapAsExpression( + subQueryTable + .slice(subQueryTable[versionColumn].max()) + .select { + subQueryTable[accessionColumn] eq accessionColumn + }, + ) + } + + val isMaxReleasedVersion = versionColumn eq maxReleasedVersionQuery() + + private fun maxReleasedVersionQuery(): Expression { + val subQueryTable = alias("subQueryTable") + return wrapAsExpression( + subQueryTable + .slice(subQueryTable[versionColumn].max()) + .select { + (subQueryTable[accessionColumn] eq accessionColumn) and + (subQueryTable[statusColumn] eq Status.APPROVED_FOR_RELEASE.name) + }, + ) + } + + fun accessionVersionIsIn(accessionVersions: List) = + Pair(accessionColumn, versionColumn) inList accessionVersions.toPairs() + + fun organismIs(organism: Organism) = organismColumn eq organism.name + + fun statusIs(status: Status) = statusColumn eq status.name + + fun statusIsOneOf(vararg status: Status) = statusColumn inList status.map { it.name } + + fun accessionVersionEquals(accessionVersion: AccessionVersionInterface) = + (accessionColumn eq accessionVersion.accession) and + (versionColumn eq accessionVersion.version) + + fun submitterIs(submitter: String) = submitterColumn eq submitter + + private fun serializeOriginalData( + compressionService: CompressionService, + organism: Organism, + ): Column = jsonb( + "original_data", + { originalData -> + jacksonObjectMapper.writeValueAsString( + compressionService.compressSequencesInOriginalData( + originalData, + organism, + ), + ) + }, + { string -> + compressionService.decompressSequencesInOriginalData( + jacksonObjectMapper.readValue(string) as OriginalData, + organism, + ) + }, + ) +} diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceUploadAuxTable.kt b/backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceUploadAuxTable.kt new file mode 100644 index 000000000..83435412b --- /dev/null +++ b/backend/src/main/kotlin/org/pathoplexus/backend/service/SequenceUploadAuxTable.kt @@ -0,0 +1,14 @@ +package org.pathoplexus.backend.service + +import org.jetbrains.exposed.sql.Table + +const val SEQUENCE_UPLOAD_TABLE_NAME = "sequence_upload_aux_table" + +object SequenceUploadAuxTable : Table(SEQUENCE_UPLOAD_TABLE_NAME) { + val sequenceUploadIdColumn = varchar("upload_id", 255) + val sequenceSubmissionIdColumn = varchar("submission_id", 255) + val segmentNameColumn = varchar("segment_name", 255) + val compressedSequenceDataColumn = text("compressed_sequence_data") + + override val primaryKey = PrimaryKey(sequenceUploadIdColumn, sequenceSubmissionIdColumn, segmentNameColumn) +} diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/service/UploadDatabaseService.kt b/backend/src/main/kotlin/org/pathoplexus/backend/service/UploadDatabaseService.kt new file mode 100644 index 000000000..2bf608892 --- /dev/null +++ b/backend/src/main/kotlin/org/pathoplexus/backend/service/UploadDatabaseService.kt @@ -0,0 +1,155 @@ +package org.pathoplexus.backend.service + +import kotlinx.datetime.LocalDateTime +import mu.KotlinLogging +import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq +import org.jetbrains.exposed.sql.VarCharColumnType +import org.jetbrains.exposed.sql.batchInsert +import org.jetbrains.exposed.sql.deleteWhere +import org.jetbrains.exposed.sql.select +import org.jetbrains.exposed.sql.statements.StatementType +import org.jetbrains.exposed.sql.transactions.transaction +import org.pathoplexus.backend.api.Organism +import org.pathoplexus.backend.api.Status +import org.pathoplexus.backend.api.SubmissionIdMapping +import org.pathoplexus.backend.model.SubmissionId +import org.pathoplexus.backend.service.MetadataUploadAuxTable.metadataColumn +import org.pathoplexus.backend.service.MetadataUploadAuxTable.organismColumn +import org.pathoplexus.backend.service.MetadataUploadAuxTable.submissionIdColumn +import org.pathoplexus.backend.service.MetadataUploadAuxTable.submitterColumn +import org.pathoplexus.backend.service.MetadataUploadAuxTable.uploadIdColumn +import org.pathoplexus.backend.service.MetadataUploadAuxTable.uploadedAtColumn +import org.pathoplexus.backend.service.SequenceUploadAuxTable.compressedSequenceDataColumn +import org.pathoplexus.backend.service.SequenceUploadAuxTable.segmentNameColumn +import org.pathoplexus.backend.service.SequenceUploadAuxTable.sequenceSubmissionIdColumn +import org.pathoplexus.backend.service.SequenceUploadAuxTable.sequenceUploadIdColumn +import org.pathoplexus.backend.utils.FastaEntry +import org.pathoplexus.backend.utils.MetadataEntry +import org.pathoplexus.backend.utils.ParseFastaHeader +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional + +private val log = KotlinLogging.logger { } + +@Service +@Transactional +class UploadDatabaseService( + private val parseFastaHeader: ParseFastaHeader, + private val compressor: CompressionService, +) { + + fun batchInsertMetadataInAuxTable( + submitter: String, + uploadId: String, + submittedOrganism: Organism, + uploadedMetadataBatch: List, + uploadedAt: LocalDateTime, + ) { + MetadataUploadAuxTable.batchInsert(uploadedMetadataBatch) { + this[submitterColumn] = submitter + this[uploadedAtColumn] = uploadedAt + this[submissionIdColumn] = it.submissionId + this[metadataColumn] = it.metadata + this[organismColumn] = submittedOrganism.name + this[uploadIdColumn] = uploadId + } + } + + fun batchInsertSequencesInAuxTable( + uploadId: String, + submittedOrganism: Organism, + uploadedSequencesBatch: List, + ) { + SequenceUploadAuxTable.batchInsert(uploadedSequencesBatch) { + val (submissionId, segmentName) = parseFastaHeader.parse(it.sampleName) + this[sequenceSubmissionIdColumn] = submissionId + this[segmentNameColumn] = segmentName + this[sequenceUploadIdColumn] = uploadId + // this can be handled better; creating named sequences in the first place; it is possible to de-compress when serializing. Issue now, segment name is not known. + this[compressedSequenceDataColumn] = compressor.compressUnalignedNucleotideSequence( + it.sequence, + segmentName, + submittedOrganism, + ) + } + } + + fun getUploadSubmissionIds(uploadId: String): Pair, List> = Pair( + MetadataUploadAuxTable + .select { uploadIdColumn eq uploadId } + .map { it[submissionIdColumn] }, + + SequenceUploadAuxTable + .select { sequenceUploadIdColumn eq uploadId } + .map { + it[sequenceSubmissionIdColumn] + }, + ) + + fun mapAndCopy(uploadId: String): List { + log.debug { "mapping and copying sequences with UploadId $uploadId" } + + val sql = + """ + INSERT INTO sequence_entries ( + accession, + organism, + submission_id, + submitter, + submitted_at, + original_data, + status + ) + SELECT + nextval('accession_sequence'), + m.organism, + m.submission_id, + m.submitter, + m.uploaded_at, + jsonb_build_object( + 'metadata', m.metadata, + 'unalignedNucleotideSequences', jsonb_object_agg(s.segment_name, s.compressed_sequence_data) + ), + '${Status.RECEIVED.name}' + FROM + metadata_upload_aux_table m + JOIN + sequence_upload_aux_table s ON m.upload_id = s.upload_id AND m.submission_id = s.submission_id + WHERE m.upload_id = ? + GROUP BY + m.upload_id, + m.organism, + m.submission_id, + m.submitter, + m.uploaded_at + RETURNING accession, version, submission_id; + """ + + return transaction { + exec( + sql, + listOf( + Pair(VarCharColumnType(), uploadId), + ), + explicitStatementType = StatementType.SELECT, + ) { rs -> + val result = mutableListOf() + while (rs.next()) { + result += SubmissionIdMapping( + rs.getString("accession"), + rs.getLong("version"), + rs.getString("submission_id"), + ) + } + result.toList() + } ?: emptyList() + } + } + + fun deleteUploadData(uploadId: String) { + log.debug { "deleting upload data with UploadId $uploadId" } + + MetadataUploadAuxTable.deleteWhere { uploadIdColumn eq uploadId } + SequenceUploadAuxTable.deleteWhere { sequenceUploadIdColumn eq uploadId } + } +} diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/utils/AccessionComparators.kt b/backend/src/main/kotlin/org/pathoplexus/backend/utils/AccessionComparators.kt new file mode 100644 index 000000000..f21754b61 --- /dev/null +++ b/backend/src/main/kotlin/org/pathoplexus/backend/utils/AccessionComparators.kt @@ -0,0 +1,21 @@ +package org.pathoplexus.backend.utils + +import org.pathoplexus.backend.api.AccessionVersionInterface + +typealias Accession = String +typealias Version = Long + +object AccessionComparator : Comparator { + override fun compare(left: Accession, right: Accession): Int { + return left.toInt().compareTo(right.toInt()) + } +} + +object AccessionVersionComparator : Comparator { + override fun compare(left: AccessionVersionInterface, right: AccessionVersionInterface): Int { + return when (val accessionResult = left.accession.toInt().compareTo(right.accession.toInt())) { + 0 -> left.version.compareTo(right.version) + else -> accessionResult + } + } +} diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/utils/FastaEntry.kt b/backend/src/main/kotlin/org/pathoplexus/backend/utils/FastaEntry.kt deleted file mode 100644 index 671a589ad..000000000 --- a/backend/src/main/kotlin/org/pathoplexus/backend/utils/FastaEntry.kt +++ /dev/null @@ -1,19 +0,0 @@ -package org.pathoplexus.backend.utils - -import java.io.IOException -import java.io.OutputStream -import java.nio.charset.StandardCharsets - -data class FastaEntry(val sampleName: String, val sequence: String) { - fun writeToStream(outputStream: OutputStream) { - try { - outputStream.write(">".toByteArray(StandardCharsets.UTF_8)) - outputStream.write(sampleName.toByteArray(StandardCharsets.UTF_8)) - outputStream.write("\n".toByteArray(StandardCharsets.UTF_8)) - outputStream.write(sequence.toByteArray(StandardCharsets.UTF_8)) - outputStream.write("\n".toByteArray(StandardCharsets.UTF_8)) - } catch (e: IOException) { - throw RuntimeException(e) - } - } -} diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/utils/FastaReader.kt b/backend/src/main/kotlin/org/pathoplexus/backend/utils/FastaReader.kt index 7dca71667..f9ac6f6f6 100644 --- a/backend/src/main/kotlin/org/pathoplexus/backend/utils/FastaReader.kt +++ b/backend/src/main/kotlin/org/pathoplexus/backend/utils/FastaReader.kt @@ -3,7 +3,8 @@ package org.pathoplexus.backend.utils import java.io.BufferedReader import java.io.InputStream import java.io.InputStreamReader -import java.util.NoSuchElementException + +data class FastaEntry(val sampleName: String, val sequence: String) class FastaReader(inputStream: InputStream) : Iterator, Iterable, AutoCloseable { private val reader: BufferedReader @@ -60,4 +61,10 @@ class FastaReader(inputStream: InputStream) : Iterator, Iterable { return this } + + fun asSequence(): Sequence = sequence { + while (hasNext()) { + yield(next()) + } + } } diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/utils/MetadataEntry.kt b/backend/src/main/kotlin/org/pathoplexus/backend/utils/MetadataEntry.kt new file mode 100644 index 000000000..e381d614c --- /dev/null +++ b/backend/src/main/kotlin/org/pathoplexus/backend/utils/MetadataEntry.kt @@ -0,0 +1,46 @@ +package org.pathoplexus.backend.utils + +import org.apache.commons.csv.CSVFormat +import org.apache.commons.csv.CSVParser +import org.pathoplexus.backend.controller.UnprocessableEntityException +import org.pathoplexus.backend.model.HEADER_TO_CONNECT_METADATA_AND_SEQUENCES +import org.pathoplexus.backend.model.SubmissionId +import java.io.InputStream +import java.io.InputStreamReader + +data class MetadataEntry(val submissionId: SubmissionId, val metadata: Map) + +fun metadataEntryStreamAsSequence(metadataInputStream: InputStream): Sequence { + val csvParser = CSVParser( + InputStreamReader(metadataInputStream), + CSVFormat.TDF.builder().setHeader().setSkipHeaderRecord(true).build(), + ) + + if (!csvParser.headerNames.contains(HEADER_TO_CONNECT_METADATA_AND_SEQUENCES)) { + throw UnprocessableEntityException( + "The metadata file does not contain the header '$HEADER_TO_CONNECT_METADATA_AND_SEQUENCES'", + ) + } + + return csvParser.asSequence().map { record -> + val submissionId = record[HEADER_TO_CONNECT_METADATA_AND_SEQUENCES] + + if (submissionId.isNullOrEmpty()) { + throw UnprocessableEntityException( + "A row in metadata file contains no $HEADER_TO_CONNECT_METADATA_AND_SEQUENCES: $record", + ) + } + + val metadata = record.toMap().filterKeys { column -> + column != HEADER_TO_CONNECT_METADATA_AND_SEQUENCES + } + + MetadataEntry(submissionId, metadata) + }.onEach { entry -> + if (entry.metadata.isEmpty()) { + throw UnprocessableEntityException( + "A row in metadata file contains no metadata columns: $entry", + ) + } + } +} diff --git a/backend/src/main/kotlin/org/pathoplexus/backend/utils/ParseFastaHeader.kt b/backend/src/main/kotlin/org/pathoplexus/backend/utils/ParseFastaHeader.kt new file mode 100644 index 000000000..fb75a490b --- /dev/null +++ b/backend/src/main/kotlin/org/pathoplexus/backend/utils/ParseFastaHeader.kt @@ -0,0 +1,25 @@ +package org.pathoplexus.backend.utils + +import org.pathoplexus.backend.config.ReferenceGenome +import org.pathoplexus.backend.controller.BadRequestException +import org.pathoplexus.backend.model.SegmentName +import org.pathoplexus.backend.model.SubmissionId +import org.springframework.stereotype.Service + +@Service +class ParseFastaHeader(private val referenceGenome: ReferenceGenome) { + fun parse(submissionId: String): Pair { + if (referenceGenome.nucleotideSequences.size == 1) { + return Pair(submissionId, "main") + } + + val lastDelimiter = submissionId.lastIndexOf("_") + if (lastDelimiter == -1) { + throw BadRequestException( + "The FASTA header $submissionId does not contain the segment name. Please provide the" + + " segment name in the format _", + ) + } + return Pair(submissionId.substring(0, lastDelimiter), submissionId.substring(lastDelimiter + 1)) + } +} diff --git a/backend/src/main/resources/application.properties b/backend/src/main/resources/application.properties index c84ee2aa8..9c4a8b1cf 100644 --- a/backend/src/main/resources/application.properties +++ b/backend/src/main/resources/application.properties @@ -6,8 +6,8 @@ springdoc.default-consumes-media-type=application/json springdoc.api-docs.path=/api-docs springdoc.swagger-ui.operationsSorter=alpha server.forward-headers-strategy=framework -spring.servlet.multipart.max-file-size=100MB -spring.servlet.multipart.max-request-size=100MB +spring.servlet.multipart.max-file-size=5000MB +spring.servlet.multipart.max-request-size=5000MB spring.datasource.driverClassName=org.postgresql.Driver diff --git a/backend/src/main/resources/db/migration/V1__init.sql b/backend/src/main/resources/db/migration/V1__init.sql index 0a8f3fb7c..3bb157a8e 100644 --- a/backend/src/main/resources/db/migration/V1__init.sql +++ b/backend/src/main/resources/db/migration/V1__init.sql @@ -20,3 +20,22 @@ create table sequence_entries ( create index on sequence_entries (submitter); create index on sequence_entries (status); + + +create table metadata_upload_aux_table ( + upload_id text not null, + organism text not null, + submission_id text not null, + submitter text not null, + uploaded_at timestamp not null, + metadata jsonb not null, + primary key (upload_id,submission_id) +); + +create table sequence_upload_aux_table ( + upload_id text not null, + submission_id text not null, + segment_name text not null, + compressed_sequence_data text not null, + primary key (upload_id,submission_id,segment_name) +); diff --git a/backend/src/main/resources/logback.xml b/backend/src/main/resources/logback.xml index 201ef8ee3..c2de4cde3 100644 --- a/backend/src/main/resources/logback.xml +++ b/backend/src/main/resources/logback.xml @@ -12,7 +12,7 @@ - + diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/DeleteSequencesEndpointTest.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/DeleteSequencesEndpointTest.kt index 77bf118e6..9b3d33672 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/DeleteSequencesEndpointTest.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/DeleteSequencesEndpointTest.kt @@ -8,7 +8,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource import org.pathoplexus.backend.api.AccessionVersion import org.pathoplexus.backend.api.Status -import org.pathoplexus.backend.service.AccessionVersionComparator +import org.pathoplexus.backend.utils.AccessionVersionComparator import org.springframework.beans.factory.annotation.Autowired import org.springframework.http.MediaType import org.springframework.test.web.servlet.result.MockMvcResultMatchers.content diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/EndpointTestExtension.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/EndpointTestExtension.kt index a6e4c8fe2..3bff5ed2d 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/EndpointTestExtension.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/EndpointTestExtension.kt @@ -5,8 +5,7 @@ import org.junit.jupiter.api.extension.BeforeAllCallback import org.junit.jupiter.api.extension.BeforeEachCallback import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.extension.ExtensionContext -import org.pathoplexus.backend.service.ACCESSION_SEQUENCE_NAME -import org.pathoplexus.backend.service.TABLE_NAME +import org.pathoplexus.backend.service.SEQUENCE_ENTRIES_TABLE_NAME import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc import org.springframework.boot.test.context.SpringBootTest import org.springframework.context.annotation.Import @@ -35,6 +34,8 @@ private const val SPRING_DATASOURCE_URL = "spring.datasource.url" private const val SPRING_DATASOURCE_USERNAME = "spring.datasource.username" private const val SPRING_DATASOURCE_PASSWORD = "spring.datasource.password" +const val ACCESSION_SEQUENCE_NAME = "accession_sequence" + class EndpointTestExtension : BeforeEachCallback, AfterAllCallback, BeforeAllCallback { companion object { private val postgres: PostgreSQLContainer<*> = PostgreSQLContainer("postgres:latest") @@ -56,7 +57,7 @@ class EndpointTestExtension : BeforeEachCallback, AfterAllCallback, BeforeAllCal "-d", postgres.databaseName, "-c", - "truncate table $TABLE_NAME; alter sequence $ACCESSION_SEQUENCE_NAME restart with 1;", + "truncate table $SEQUENCE_ENTRIES_TABLE_NAME; alter sequence $ACCESSION_SEQUENCE_NAME restart with 1;", ) } diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/ExceptionHandlerTest.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/ExceptionHandlerTest.kt index d31295fdc..9b68ebaef 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/ExceptionHandlerTest.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/ExceptionHandlerTest.kt @@ -110,7 +110,7 @@ class ExceptionHandlerWithMockedModelTest(@Autowired val mockMvc: MockMvc) { @Test fun `WHEN I submit a request with invalid organism THEN it should return a descriptive error message`() { - every { submitModel.processSubmission(any(), any(), any(), any()) } returns validResponse + every { submitModel.processSubmission(any(), any(), any(), any(), any()) } returns validResponse mockMvc.perform( multipart("/unknownOrganism/submit") diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/GetReleasedDataEndpointTest.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/GetReleasedDataEndpointTest.kt index 14d5491aa..d891c7b0e 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/GetReleasedDataEndpointTest.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/GetReleasedDataEndpointTest.kt @@ -15,8 +15,8 @@ import org.pathoplexus.backend.api.ProcessedData import org.pathoplexus.backend.api.SiloVersionStatus import org.pathoplexus.backend.controller.SubmitFiles.DefaultFiles import org.pathoplexus.backend.controller.SubmitFiles.DefaultFiles.firstAccession -import org.pathoplexus.backend.service.Accession -import org.pathoplexus.backend.service.Version +import org.pathoplexus.backend.utils.Accession +import org.pathoplexus.backend.utils.Version import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.web.servlet.result.MockMvcResultMatchers.status import java.time.LocalDateTime diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/PreparedProcessedData.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/PreparedProcessedData.kt index 85b0540e0..e2a39b921 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/PreparedProcessedData.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/PreparedProcessedData.kt @@ -16,7 +16,7 @@ import org.pathoplexus.backend.api.ProcessedData import org.pathoplexus.backend.api.SegmentName import org.pathoplexus.backend.api.SubmittedProcessedData import org.pathoplexus.backend.controller.SubmitFiles.DefaultFiles -import org.pathoplexus.backend.service.Accession +import org.pathoplexus.backend.utils.Accession val defaultProcessedData = ProcessedData( metadata = mapOf( diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/ReviseEndpointTest.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/ReviseEndpointTest.kt index 86a5eaff3..22bc8596e 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/ReviseEndpointTest.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/ReviseEndpointTest.kt @@ -20,7 +20,6 @@ import org.springframework.test.web.servlet.ResultMatcher import org.springframework.test.web.servlet.result.MockMvcResultMatchers.content import org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath import org.springframework.test.web.servlet.result.MockMvcResultMatchers.status - @EndpointTest class ReviseEndpointTest( @Autowired val client: SubmissionControllerClient, @@ -246,7 +245,7 @@ class ReviseEndpointTest( SubmitFiles.sequenceFileWith(), status().isUnprocessableEntity, "Unprocessable Entity", - "Metadata file contains duplicate submissionIds: [sameHeader]", + "Metadata file contains duplicate submissionIds", ), Arguments.of( "duplicate headers in sequence file", @@ -261,7 +260,7 @@ class ReviseEndpointTest( ), status().isUnprocessableEntity, "Unprocessable Entity", - "Sequence file contains duplicate submissionIds: sameHeader_main", + "Sequence file contains duplicate submissionIds", ), Arguments.of( "metadata file misses headers", @@ -281,7 +280,7 @@ class ReviseEndpointTest( ), status().isUnprocessableEntity, "Unprocessable Entity", - "Sequence file contains submissionIds that are not present in the metadata file: [notInMetadata]", + "Sequence file contains 1 submissionIds that are not present in the metadata file: notInMetadata", ), Arguments.of( "sequence file misses submissionIds", @@ -300,7 +299,7 @@ class ReviseEndpointTest( ), status().isUnprocessableEntity, "Unprocessable Entity", - "Metadata file contains submissionIds that are not present in the sequence file: [notInSequences]", + "Metadata file contains 1 submissionIds that are not present in the sequence file: notInSequences", ), Arguments.of( "metadata file misses accession header", diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SingleSegmentedSubmitEndpointTest.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SingleSegmentedSubmitEndpointTest.kt index 6ae651b73..5c1c33da0 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SingleSegmentedSubmitEndpointTest.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SingleSegmentedSubmitEndpointTest.kt @@ -58,7 +58,7 @@ class SingleSegmentedSubmitEndpointTest( @Test fun `GIVEN input data with explicit default segment name THEN data is rejected`() { - val expectedDetail = "Metadata file contains submissionIds that are not present in the sequence file: [header1]" + val expectedDetail = "Metadata file contains 1 submissionIds that are not present in the sequence file: header1" submissionControllerClient.submit( SubmitFiles.metadataFileWith( diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmissionControllerClient.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmissionControllerClient.kt index f5ab4705d..1792f0bab 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmissionControllerClient.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmissionControllerClient.kt @@ -4,7 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import org.pathoplexus.backend.api.AccessionVersion import org.pathoplexus.backend.api.SubmittedProcessedData import org.pathoplexus.backend.api.UnprocessedData -import org.pathoplexus.backend.service.Accession +import org.pathoplexus.backend.utils.Accession import org.springframework.http.MediaType import org.springframework.mock.web.MockMultipartFile import org.springframework.test.web.servlet.MockMvc diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmissionConvenienceClient.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmissionConvenienceClient.kt index e1348974a..473a8a108 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmissionConvenienceClient.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmissionConvenienceClient.kt @@ -12,7 +12,7 @@ import org.pathoplexus.backend.api.SubmissionIdMapping import org.pathoplexus.backend.api.SubmittedProcessedData import org.pathoplexus.backend.api.UnprocessedData import org.pathoplexus.backend.controller.SubmitFiles.DefaultFiles -import org.pathoplexus.backend.service.Accession +import org.pathoplexus.backend.utils.Accession import org.springframework.http.MediaType import org.springframework.test.web.servlet.ResultActions import org.springframework.test.web.servlet.result.MockMvcResultMatchers.content diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitEndpointTest.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitEndpointTest.kt index 139959299..0c97216f6 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitEndpointTest.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitEndpointTest.kt @@ -7,6 +7,8 @@ import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource import org.pathoplexus.backend.controller.SubmitFiles.DefaultFiles import org.pathoplexus.backend.controller.SubmitFiles.DefaultFiles.NUMBER_OF_SEQUENCES +import org.pathoplexus.backend.model.SubmitModel +import org.pathoplexus.backend.service.CompressionAlgorithm import org.springframework.beans.factory.annotation.Autowired import org.springframework.http.MediaType.APPLICATION_JSON_VALUE import org.springframework.mock.web.MockMultipartFile @@ -29,8 +31,13 @@ class SubmitEndpointTest(@Autowired val submissionControllerClient: SubmissionCo } } - @Test - fun `GIVEN valid input data THEN returns mapping of provided custom ids to generated ids`() { + @ParameterizedTest(name = "GIVEN {0} THEN data is accepted and submitted") + @MethodSource("compressionForSubmit") + fun `GIVEN valid input data THEN returns mapping of provided custom ids to generated ids`( + title: String, + metadataFile: MockMultipartFile, + sequencesFile: MockMultipartFile, + ) { submissionControllerClient.submit( DefaultFiles.metadataFile, DefaultFiles.sequencesFile, @@ -81,6 +88,34 @@ class SubmitEndpointTest(@Autowired val submissionControllerClient: SubmissionCo } companion object { + @JvmStatic + fun compressionForSubmit(): List { + return listOf( + Arguments.of( + "uncompressed files", + DefaultFiles.metadataFile, + DefaultFiles.sequencesFile, + ), + Arguments.of( + "ZSTD compressed metadata file", + DefaultFiles.metadataFiles[CompressionAlgorithm.ZSTD], + DefaultFiles.sequencesFile, + ), + Arguments.of( + "ZSTD compressed sequences file", + DefaultFiles.metadataFile, + DefaultFiles.sequencesFiles[CompressionAlgorithm.ZSTD], + ), + ) + + CompressionAlgorithm.entries.map { compression -> + Arguments.of( + "${compression.name} compressed metadata file and sequences file", + DefaultFiles.metadataFiles[compression], + DefaultFiles.sequencesFiles[compression], + ) + } + } + @JvmStatic fun badRequestForSubmit(): List { return listOf( @@ -106,7 +141,10 @@ class SubmitEndpointTest(@Autowired val submissionControllerClient: SubmissionCo DefaultFiles.sequencesFile, status().isBadRequest, "Bad Request", - "Metadata file must have extension .tsv", + "${SubmitModel.AcceptedFileTypes.metadataFile.displayName} has wrong extension. Must be " + + ".${SubmitModel.AcceptedFileTypes.metadataFile.validExtensions} for uncompressed " + + "submissions or .${SubmitModel.AcceptedFileTypes.metadataFile.getCompressedExtensions()} " + + "for compressed submissions", ), Arguments.of( "wrong extension for sequences file", @@ -114,7 +152,10 @@ class SubmitEndpointTest(@Autowired val submissionControllerClient: SubmissionCo SubmitFiles.sequenceFileWith(originalFilename = "sequences.wrongExtension"), status().isBadRequest, "Bad Request", - "Sequence file must have extension .fasta", + "${SubmitModel.AcceptedFileTypes.sequenceFile.displayName} has wrong extension. Must be " + + ".${SubmitModel.AcceptedFileTypes.sequenceFile.validExtensions} for uncompressed " + + "submissions or .${SubmitModel.AcceptedFileTypes.sequenceFile.getCompressedExtensions()} " + + "for compressed submissions", ), Arguments.of( "metadata file where one row has a blank header", @@ -155,7 +196,7 @@ class SubmitEndpointTest(@Autowired val submissionControllerClient: SubmissionCo DefaultFiles.sequencesFile, status().isUnprocessableEntity, "Unprocessable Entity", - "Metadata file contains duplicate submissionIds: [sameHeader]", + "Metadata file contains at least one duplicate submissionId", ), Arguments.of( "duplicate headers in sequence file", @@ -170,7 +211,7 @@ class SubmitEndpointTest(@Autowired val submissionControllerClient: SubmissionCo ), status().isUnprocessableEntity, "Unprocessable Entity", - "Sequence file contains duplicate submissionIds: sameHeader", + "Sequence file contains at least one duplicate submissionId", ), Arguments.of( "metadata file misses headers", @@ -190,7 +231,7 @@ class SubmitEndpointTest(@Autowired val submissionControllerClient: SubmissionCo ), status().isUnprocessableEntity, "Unprocessable Entity", - "Sequence file contains submissionIds that are not present in the metadata file: [notInMetadata]", + "Sequence file contains 1 submissionIds that are not present in the metadata file: notInMetadata", ), Arguments.of( "sequence file misses headers", @@ -209,7 +250,7 @@ class SubmitEndpointTest(@Autowired val submissionControllerClient: SubmissionCo ), status().isUnprocessableEntity, "Unprocessable Entity", - "Metadata file contains submissionIds that are not present in the sequence file: [notInSequences]", + "Metadata file contains 1 submissionIds that are not present in the sequence file: notInSequences", ), Arguments.of( "FASTA header misses segment name", diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitFiles.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitFiles.kt index 0198583d6..c98754747 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitFiles.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitFiles.kt @@ -1,17 +1,40 @@ package org.pathoplexus.backend.controller +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream +import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream +import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream +import org.pathoplexus.backend.service.CompressionAlgorithm import org.springframework.http.MediaType.TEXT_PLAIN_VALUE import org.springframework.mock.web.MockMultipartFile +import org.tukaani.xz.LZMA2Options +import org.tukaani.xz.XZOutputStream +import java.io.ByteArrayOutputStream +import java.util.zip.ZipEntry +import java.util.zip.ZipOutputStream private const val DEFAULT_METADATA_FILE_NAME = "metadata.tsv" private const val REVISED_METADATA_FILE_NAME = "revised_metadata.tsv" private const val DEFAULT_SEQUENCES_FILE_NAME = "sequences.fasta" object SubmitFiles { + object DefaultFiles { - val metadataFile = metadataFileWith(content = getFileContent(DEFAULT_METADATA_FILE_NAME)) val revisedMetadataFile = metadataFileWith(content = getFileContent(REVISED_METADATA_FILE_NAME)) - val sequencesFile = sequenceFileWith(content = getFileContent(DEFAULT_SEQUENCES_FILE_NAME)) + val metadataFiles = CompressionAlgorithm.entries.associateWith { + metadataFileWith( + content = getFileContent(DEFAULT_METADATA_FILE_NAME), + compression = it, + ) + } + val sequencesFiles = CompressionAlgorithm.entries.associateWith { + sequenceFileWith( + content = getFileContent(DEFAULT_SEQUENCES_FILE_NAME), + compression = it, + ) + } + val metadataFile = metadataFiles[CompressionAlgorithm.NONE] ?: error("No metadata file") + val sequencesFile = sequencesFiles[CompressionAlgorithm.NONE] ?: error("No sequences file") const val NUMBER_OF_SEQUENCES = 10 val allAccessions = (1L..NUMBER_OF_SEQUENCES).toList().map { it.toString() } @@ -31,12 +54,14 @@ object SubmitFiles { originalFilename: String = "metadata.tsv", mediaType: String = TEXT_PLAIN_VALUE, content: String = "submissionId\tfirstColumn\nsomeHeader\tsomeValue\nsomeHeader2\tsomeValue2", + compression: CompressionAlgorithm = CompressionAlgorithm.NONE, ): MockMultipartFile { + val contentStream = compressString(content, compression) return MockMultipartFile( name, - originalFilename, + originalFilename + compression.extension, mediaType, - content.byteInputStream(), + contentStream, ) } @@ -59,10 +84,77 @@ object SubmitFiles { originalFilename: String = "sequences.fasta", mediaType: String = TEXT_PLAIN_VALUE, content: String = ">someHeader_main\nAC\n>someHeader2_main\nAC", - ) = MockMultipartFile( - name, - originalFilename, - mediaType, - content.byteInputStream(), - ) + compression: CompressionAlgorithm = CompressionAlgorithm.NONE, + ): MockMultipartFile { + val contentStream = compressString(content, compression) + return MockMultipartFile( + name, + originalFilename + compression.extension, + mediaType, + contentStream, + ) + } +} + +fun compressString(input: String, compressionAlgorithm: CompressionAlgorithm): ByteArray { + return try { + when (compressionAlgorithm) { + CompressionAlgorithm.ZSTD -> compressZstd(input) + CompressionAlgorithm.XZ -> compressXZ(input) + CompressionAlgorithm.GZIP -> compressGzip(input) + CompressionAlgorithm.ZIP -> compressZip(input) + CompressionAlgorithm.BZIP2 -> compressBzip2(input) + CompressionAlgorithm.LZMA -> compressLzma(input) + CompressionAlgorithm.NONE -> input.toByteArray() + } + } catch (e: Exception) { + throw RuntimeException("Error compressing the string with $compressionAlgorithm") + } +} + +fun compressBzip2(input: String): ByteArray = ByteArrayOutputStream().use { byteArrayOutputStream -> + BZip2CompressorOutputStream(byteArrayOutputStream).use { bzip2Out -> + bzip2Out.write(input.toByteArray()) + } + byteArrayOutputStream.toByteArray() +} + +fun compressGzip(input: String): ByteArray = ByteArrayOutputStream().use { byteArrayOutputStream -> + GzipCompressorOutputStream(byteArrayOutputStream).use { gzipOut -> + gzipOut.write(input.toByteArray()) + } + byteArrayOutputStream.toByteArray() +} + +fun compressXZ(input: String): ByteArray = ByteArrayOutputStream().use { byteArrayOutputStream -> + XZCompressorOutputStream(byteArrayOutputStream).use { xzOut -> + xzOut.write(input.toByteArray()) + } + byteArrayOutputStream.toByteArray() +} + +fun compressZstd(input: String): ByteArray = ByteArrayOutputStream().use { byteArrayOutputStream -> + ZstdCompressorOutputStream(byteArrayOutputStream).use { zstdOut -> + zstdOut.write(input.toByteArray()) + } + byteArrayOutputStream.toByteArray() +} + +fun compressZip(input: String): ByteArray { + return ByteArrayOutputStream().use { byteArrayOutputStream -> + ZipOutputStream(byteArrayOutputStream).use { zipOut -> + zipOut.putNextEntry(ZipEntry("data.txt")) + zipOut.write(input.toByteArray()) + zipOut.closeEntry() + } + byteArrayOutputStream.toByteArray() + } +} + +fun compressLzma(input: String): ByteArray { + val byteArrayOutputStream = ByteArrayOutputStream() + XZOutputStream(byteArrayOutputStream, LZMA2Options()).use { lzmaOut -> + lzmaOut.write(input.toByteArray()) + } + return byteArrayOutputStream.toByteArray() } diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitProcessedDataEndpointTest.kt b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitProcessedDataEndpointTest.kt index dad985240..370a243b1 100644 --- a/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitProcessedDataEndpointTest.kt +++ b/backend/src/test/kotlin/org/pathoplexus/backend/controller/SubmitProcessedDataEndpointTest.kt @@ -12,9 +12,9 @@ import org.pathoplexus.backend.api.Status import org.pathoplexus.backend.api.SubmittedProcessedData import org.pathoplexus.backend.api.UnprocessedData import org.pathoplexus.backend.controller.SubmitFiles.DefaultFiles.firstAccession -import org.pathoplexus.backend.service.Accession import org.pathoplexus.backend.service.AminoAcidSymbols import org.pathoplexus.backend.service.NucleotideSymbols +import org.pathoplexus.backend.utils.Accession import org.springframework.beans.factory.annotation.Autowired import org.springframework.http.MediaType import org.springframework.test.web.servlet.result.MockMvcResultMatchers.content diff --git a/backend/src/test/kotlin/org/pathoplexus/backend/service/CompressionServiceTest.kt b/backend/src/test/kotlin/org/pathoplexus/backend/service/CompressionServiceTest.kt new file mode 100644 index 000000000..5665b4107 --- /dev/null +++ b/backend/src/test/kotlin/org/pathoplexus/backend/service/CompressionServiceTest.kt @@ -0,0 +1,33 @@ +package org.pathoplexus.backend.service + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.pathoplexus.backend.SpringBootTestWithoutDatabase +import org.pathoplexus.backend.api.Organism +import org.pathoplexus.backend.api.OriginalData +import org.pathoplexus.backend.config.BackendConfig +import org.springframework.beans.factory.annotation.Autowired + +@SpringBootTestWithoutDatabase +class CompressionServiceTest( + @Autowired private val compressor: CompressionService, + @Autowired private val backendConfig: BackendConfig, +) { + + @Test + fun `Compress and decompress sequence`() { + val input = + "NNACTGACTGACTGACTGATCGATCGATCGATCGATCGATCGATC----NNNNATCGCGATCGATCGATCGATCGGGATCGTAGC--NNNNATGC" + + val segmentName = "main" + val testData = OriginalData( + mapOf("test" to "test"), + mapOf(segmentName to input), + ) + val organism = Organism(backendConfig.instances.keys.first()) + val compressed = compressor.compressSequencesInOriginalData(testData, organism) + val decompressed = compressor.decompressSequencesInOriginalData(compressed, organism) + + assertEquals(testData, decompressed) + } +} diff --git a/preprocessing/specification.md b/preprocessing/specification.md index 336c3cd1c..4102019da 100644 --- a/preprocessing/specification.md +++ b/preprocessing/specification.md @@ -14,7 +14,7 @@ In following, we list a series of tasks that the preprocessing pipeline would us **Validation:** The preprocessing pipeline checks the input data and emits errors or warnings. As mentioned above, the only constraint is that the output of the preprocessing pipeline conforms to the right (technical) format. Otherwise, a pipeline may be generous (e.g., allow every value in the "country" field) or be more restrictive (e.g., only allow a fixed set of values in the "country" field). -**Alignment and translations:** The submitter only provides unaligned nucleotide sequences. To allow searching by nucleotide and amino acid mutations, the preprocessing pipeline must perform the alignment and compute the translations to amino acid sequences. +**Alignment and translations:** The submitter only provides unaligned nucleotide sequences. To allow searching by nucleotide and amino acid mutations, the preprocessing pipeline must perform the alignment and compute the translations to amino acid sequences. **Annotation:** The preprocessing pipeline can add annotations such as clade/lineage classifications. @@ -31,7 +31,7 @@ In following, we list a series of tasks that the preprocessing pipeline would us - **Sequence entry:** A sequence entry consists of a genome sequence (or sequences if the organisms has a segmented genome) and associated metadata. It is the main entity of the Pathoplexus application. Users submit sequence entries and search for sequence entries. Each sequence entry has its own accession. Changes to sequence entries are versioned, meaning that a sequence entry can have multiple versions. - **Unpreprocessed data:** sequence entries as provided by the submitters - **Preprocessed data:** sequence entries after being processed by the preprocessing pipeline. The preprocessed data must be consistent with the organism instance schema and will be passed to LAPIS and SILO. -- **Nucleotide sequence segment:** A nucleotide sequence consists of one or multiple segments. If there is only a single segment (e.g., as in SARS-CoV-2), the segment name should be `main`. +- **Nucleotide sequence segment:** A nucleotide sequence consists of one or multiple segments. If there is only a single segment (e.g., as in SARS-CoV-2), the segment name should be `main`. For multi-segmented sequences, the segment names must match the corresponding reference genomes. ## Workflow overview diff --git a/website/tests/e2e.fixture.ts b/website/tests/e2e.fixture.ts index 88fa05cb9..ab01a91a4 100644 --- a/website/tests/e2e.fixture.ts +++ b/website/tests/e2e.fixture.ts @@ -50,7 +50,9 @@ export const testUser = 'testuser'; export const testUserPassword = 'testuser'; export const metadataTestFile: string = './tests/testData/metadata.tsv'; +export const compressedMetadataTestFile: string = './tests/testData/metadata.tsv.zst'; export const sequencesTestFile: string = './tests/testData/sequences.fasta'; +export const compressedSequencesTestFile: string = './tests/testData/sequences.fasta.zst'; export const testSequenceCount: number = readFileSync(metadataTestFile, 'utf-8') diff --git a/website/tests/pages/submit/index.spec.ts b/website/tests/pages/submit/index.spec.ts index fbae8d101..5dc5522f1 100644 --- a/website/tests/pages/submit/index.spec.ts +++ b/website/tests/pages/submit/index.spec.ts @@ -11,4 +11,15 @@ test.describe('The submit page', () => { await submitPage.submitButton.click(); await expect(submitPage.page.getByText('Response Sequence Headers')).toBeVisible(); }); + + test('should upload compressed files and submit', async ({ submitPage, loginAsTestUser }) => { + await loginAsTestUser(); + await submitPage.goto(); + + await Promise.all([submitPage.uploadCompressedSequenceData(), submitPage.uploadCompressedMetadata()]); + + await expect(submitPage.page.getByText('Response Sequence Headers')).not.toBeVisible(); + await submitPage.submitButton.click(); + await expect(submitPage.page.getByText('Response Sequence Headers')).toBeVisible(); + }); }); diff --git a/website/tests/pages/submit/submit.page.ts b/website/tests/pages/submit/submit.page.ts index b65b13065..5b6fe3b72 100644 --- a/website/tests/pages/submit/submit.page.ts +++ b/website/tests/pages/submit/submit.page.ts @@ -1,7 +1,14 @@ import type { Locator, Page } from '@playwright/test'; -import { baseUrl, dummyOrganism, expect, metadataTestFile, sequencesTestFile } from '../../e2e.fixture'; import { routes } from '../../../src/routes.ts'; +import { + baseUrl, + compressedMetadataTestFile, + compressedSequencesTestFile, + dummyOrganism, + metadataTestFile, + sequencesTestFile, +} from '../../e2e.fixture'; export class SubmitPage { public readonly submitButton: Locator; @@ -16,10 +23,15 @@ export class SubmitPage { public async uploadMetadata() { await this.page.getByPlaceholder('Metadata File:').setInputFiles(metadataTestFile); - expect(this.page.getByText('metadata.tsv')); + } + public async uploadCompressedMetadata() { + await this.page.getByPlaceholder('Metadata File:').setInputFiles(compressedMetadataTestFile); } public async uploadSequenceData() { await this.page.getByPlaceholder('Sequences File:').setInputFiles(sequencesTestFile); } + public async uploadCompressedSequenceData() { + await this.page.getByPlaceholder('Sequences File:').setInputFiles(compressedSequencesTestFile); + } } diff --git a/website/tests/testData/metadata.tsv.zst b/website/tests/testData/metadata.tsv.zst new file mode 100644 index 0000000000000000000000000000000000000000..9f01ccca95e499c7be63bb572b15c9b4e01c0f09 GIT binary patch literal 253 zcmVa?+}<`04%;jWD&NI;!gYj#{5*RGh~%MpAVrGlylZa(U}o8SF+Ah@ zpljb?L$pE(=KTGF{>?B$cB@b&_;+jVG`toiAW$QY9A4|d6=#U=T