From e78a997b59541e9119f9cb5baab3384ca9d6857b Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 20 Dec 2021 14:58:36 +0100 Subject: [PATCH] Add tags attribute to publishDir directive This commit adds the abiltity to associate arbitrary tags to files created by the publishDir directory. Tags a can be provided with a associated array e.g. publishDir some_path, tags: [FOO: 'this', BAR: 'bar'] or can be resolved dynamically using a closure returning the desidered associative array representing the path. This feature requires the target file system supporting the ability to associate tags to files. Currently only supported by AWS S3. --- docs/process.rst | 1 + .../nextflow/processor/PublishDir.groovy | 29 ++++++++++++++- .../main/nextflow/file/TagAwareFile.groovy | 29 +++++++++++++++ .../com/upplication/s3fs/AmazonS3Client.java | 23 +++++++++--- .../s3fs/S3FileSystemProvider.java | 26 +++++++++----- .../com/upplication/s3fs/S3OutputStream.java | 10 ++++++ .../src/main/com/upplication/s3fs/S3Path.java | 25 ++++++++++++- .../s3fs/util/S3UploadRequest.java | 13 ++++++- .../com/upplication/s3fs/AwsS3NioTest.groovy | 35 ++++++++++++++++++- .../processor/PublishDirS3Test.groovy | 29 +++++++++++++++ 10 files changed, 204 insertions(+), 16 deletions(-) create mode 100644 modules/nf-commons/src/main/nextflow/file/TagAwareFile.groovy diff --git a/docs/process.rst b/docs/process.rst index 45a514007b..8a34492aa1 100644 --- a/docs/process.rst +++ b/docs/process.rst @@ -2045,6 +2045,7 @@ saveAs A closure which, given the name of the file being published, ret Return the value ``null`` from the closure to *not* publish a file. This is useful when the process has multiple output files, but you want to publish only some of them. enabled Enable or disable the publish rule depending on the boolean value specified (default: ``true``). +tags Allow to associate tags with the target file e.g. ``tag: [FOO: 'Hello world']`` (EXPERIMENTAL, currently only supported by files stored on AWS S3, requires version ``21.12.0-edge`` or later). =============== ================= Table of publish modes: diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index dabe9c4806..46af546607 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -37,6 +37,7 @@ import nextflow.Global import nextflow.Session import nextflow.extension.FilesEx import nextflow.file.FileHelper +import nextflow.file.TagAwareFile import nextflow.util.PathTrie /** * Implements the {@code publishDir} directory. It create links or copies the output @@ -84,6 +85,11 @@ class PublishDir { */ boolean enabled = true + /** + * Tags to be associated to the target file + */ + private def tags + private PathMatcher matcher private FileSystem sourceFileSystem @@ -120,6 +126,17 @@ class PublishDir { this.mode = mode } + static @PackageScope Map resolveTags( tags ) { + def result = tags instanceof Closure + ? tags.call() + : tags + + if( result instanceof Map ) + return result + + throw new IllegalArgumentException("Invalid publishDir tags attribute: $tags") + } + @PackageScope boolean checkNull(String str) { ( str =~ /\bnull\b/ ).find() } @@ -152,11 +169,14 @@ class PublishDir { result.overwrite = Boolean.parseBoolean(params.overwrite.toString()) if( params.saveAs ) - result.saveAs = params.saveAs + result.saveAs = (Closure) params.saveAs if( params.enabled != null ) result.enabled = Boolean.parseBoolean(params.enabled.toString()) + if( params.tags != null ) + result.tags = params.tags + return result } @@ -217,6 +237,7 @@ class PublishDir { this.sourceFileSystem = sourceDir ? sourceDir.fileSystem : null apply0(files) } + /** * Apply the publishing process to the specified {@link TaskRun} instance * @@ -258,6 +279,12 @@ class PublishDir { } final destination = resolveDestination(target) + + // apply tags + if( this.tags!=null && destination instanceof TagAwareFile ) { + destination.setTags( resolveTags(this.tags) ) + } + if( inProcess ) { safeProcessFile(source, destination) } diff --git a/modules/nf-commons/src/main/nextflow/file/TagAwareFile.groovy b/modules/nf-commons/src/main/nextflow/file/TagAwareFile.groovy new file mode 100644 index 0000000000..c0803ed8f1 --- /dev/null +++ b/modules/nf-commons/src/main/nextflow/file/TagAwareFile.groovy @@ -0,0 +1,29 @@ +/* + * Copyright 2020-2021, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.file + +/** + * Defines the interface for annotate a file with one ore more tags + * + * @author Paolo Di Tommaso + */ +interface TagAwareFile { + + void setTags(Map tags) + +} diff --git a/plugins/nf-amazon/src/main/com/upplication/s3fs/AmazonS3Client.java b/plugins/nf-amazon/src/main/com/upplication/s3fs/AmazonS3Client.java index 362cbd2e66..68daa9d1c5 100644 --- a/plugins/nf-amazon/src/main/com/upplication/s3fs/AmazonS3Client.java +++ b/plugins/nf-amazon/src/main/com/upplication/s3fs/AmazonS3Client.java @@ -67,16 +67,19 @@ import com.amazonaws.services.s3.model.CopyObjectResult; import com.amazonaws.services.s3.model.CopyPartRequest; import com.amazonaws.services.s3.model.CopyPartResult; +import com.amazonaws.services.s3.model.GetObjectTaggingRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.ObjectTagging; import com.amazonaws.services.s3.model.Owner; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.Tag; import com.upplication.s3fs.util.S3MultipartOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,15 +145,19 @@ public PutObjectResult putObject(String bucket, String key, File file) { } return client.putObject(req); } + /** * @see com.amazonaws.services.s3.AmazonS3Client#putObject(String, String, java.io.InputStream, ObjectMetadata) */ - public PutObjectResult putObject(String bucket, String keyName, InputStream inputStream, ObjectMetadata metadata) { + public PutObjectResult putObject(String bucket, String keyName, InputStream inputStream, ObjectMetadata metadata, List tags) { PutObjectRequest req = new PutObjectRequest(bucket, keyName, inputStream, metadata); if( cannedAcl != null ) { - log.trace("Setting canned ACL={}; bucket={} and stream", cannedAcl, bucket); + log.trace("Setting canned ACL={}; bucket={}; tags={} and stream", cannedAcl, bucket, tags); req.withCannedAcl(cannedAcl); } + if( tags != null ) { + req.setTagging(new ObjectTagging(tags)); + } return client.putObject(req); } /** @@ -236,14 +243,18 @@ public ObjectMetadata getObjectMetadata(String bucketName, String key) { return client.getObjectMetadata(bucketName, key); } - /** + public List getObjectTags(String bucketName, String key) { + return client.getObjectTagging(new GetObjectTaggingRequest(bucketName,key)).getTagSet(); + } + + /** * @see com.amazonaws.services.s3.AmazonS3Client#listNextBatchOfObjects(com.amazonaws.services.s3.model.ObjectListing) */ public ObjectListing listNextBatchOfObjects(ObjectListing objectListing) { return client.listNextBatchOfObjects(objectListing); } - public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSize, S3MultipartOptions opts, ObjectMetadata targetObjectMetadata ) { + public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSize, S3MultipartOptions opts, ObjectMetadata targetObjectMetadata, List tags ) { final String sourceBucketName = s3Source.getBucket(); final String sourceObjectKey = s3Source.getKey(); @@ -262,6 +273,10 @@ public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSiz initiateRequest.withObjectMetadata(targetObjectMetadata); } + if( tags != null && tags.size()>0 ) { + initiateRequest.setTagging( new ObjectTagging(tags)); + } + InitiateMultipartUploadResult initResult = client.initiateMultipartUpload(initiateRequest); diff --git a/plugins/nf-amazon/src/main/com/upplication/s3fs/S3FileSystemProvider.java b/plugins/nf-amazon/src/main/com/upplication/s3fs/S3FileSystemProvider.java index 1425d9d876..89aea70701 100644 --- a/plugins/nf-amazon/src/main/com/upplication/s3fs/S3FileSystemProvider.java +++ b/plugins/nf-amazon/src/main/com/upplication/s3fs/S3FileSystemProvider.java @@ -74,6 +74,7 @@ import java.util.EnumSet; import java.util.Iterator; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -91,9 +92,11 @@ import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.Grant; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.ObjectTagging; import com.amazonaws.services.s3.model.Owner; import com.amazonaws.services.s3.model.Permission; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.Tag; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -354,7 +357,7 @@ private S3OutputStream createUploaderOutputStream( S3Path fileToUpload ) { S3UploadRequest req = props != null ? new S3UploadRequest(props) : new S3UploadRequest(); req.setObjectId(fileToUpload.toS3ObjectId()); - + req.setTags(fileToUpload.getTagsList()); S3OutputStream stream = new S3OutputStream(s3.getClient(), req); stream.setCannedAcl(s3.getCannedAcl()); return stream; @@ -398,6 +401,7 @@ public SeekableByteChannel newByteChannel(Path path, // and we can use the File SeekableByteChannel implementation final SeekableByteChannel seekable = Files .newByteChannel(tempFile, options); + final List tags = ((S3Path) path).getTagsList(); return new SeekableByteChannel() { @Override @@ -431,7 +435,8 @@ public void close() throws IOException { .getClient() .putObject(s3Path.getBucket(), s3Path.getKey(), stream, - metadata); + metadata, + tags); } } else { @@ -493,6 +498,7 @@ public void createDirectory(Path dir, FileAttribute... attrs) Preconditions.checkArgument(attrs.length == 0, "attrs not yet supported: %s", ImmutableList.copyOf(attrs)); // TODO + List tags = s3Path.getTagsList(); ObjectMetadata metadata = new ObjectMetadata(); metadata.setContentLength(0); if( isAES256Enabled() ) @@ -504,7 +510,7 @@ public void createDirectory(Path dir, FileAttribute... attrs) s3Path.getFileSystem() .getClient() .putObject(s3Path.getBucket(), keyName, - new ByteArrayInputStream(new byte[0]), metadata); + new ByteArrayInputStream(new byte[0]), metadata, tags); } @Override @@ -570,18 +576,22 @@ public void copy(Path source, Path target, CopyOption... options) final S3MultipartOptions opts = props != null ? new S3MultipartOptions<>(props) : new S3MultipartOptions(); final int chunkSize = opts.getChunkSize(); final long length = sourceObjMetadata.getContentLength(); - + final List tags = ((S3Path) target).getTagsList(); + if( length <= chunkSize ) { CopyObjectRequest copyObjRequest = new CopyObjectRequest(s3Source.getBucket(), s3Source.getKey(),s3Target.getBucket(), s3Target.getKey()); - + if( tags.size()>0 ) { + copyObjRequest.setNewObjectTagging(new ObjectTagging(tags)); + } + ObjectMetadata targetObjectMetadata = null; if( isAES256Enabled() ) { targetObjectMetadata = new ObjectMetadata(); targetObjectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); copyObjRequest.setNewObjectMetadata(targetObjectMetadata); } - log.trace("Copy file via copy object - source: source={}, target={}, metadata={}", s3Source, s3Target, targetObjectMetadata); + log.trace("Copy file via copy object - source: source={}, target={}, metadata={}, tags={}", s3Source, s3Target, targetObjectMetadata, tags); client.copyObject(copyObjRequest); } else { @@ -590,8 +600,8 @@ public void copy(Path source, Path target, CopyOption... options) targetObjectMetadata = new ObjectMetadata(); targetObjectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); } - log.trace("Copy file via multipart upload - source: source={}, target={}, metadata={}", s3Source, s3Target, targetObjectMetadata); - client.multipartCopyObject(s3Source, s3Target, length, opts, targetObjectMetadata); + log.trace("Copy file via multipart upload - source: source={}, target={}, metadata={}, tags={}", s3Source, s3Target, targetObjectMetadata, tags); + client.multipartCopyObject(s3Source, s3Target, length, opts, targetObjectMetadata, tags); } } diff --git a/plugins/nf-amazon/src/main/com/upplication/s3fs/S3OutputStream.java b/plugins/nf-amazon/src/main/com/upplication/s3fs/S3OutputStream.java index bea7d6142d..8c04015912 100644 --- a/plugins/nf-amazon/src/main/com/upplication/s3fs/S3OutputStream.java +++ b/plugins/nf-amazon/src/main/com/upplication/s3fs/S3OutputStream.java @@ -27,6 +27,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -43,10 +44,12 @@ import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.ObjectTagging; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3ObjectId; import com.amazonaws.services.s3.model.StorageClass; +import com.amazonaws.services.s3.model.Tag; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.util.Base64; import com.upplication.s3fs.util.ByteBufferInputStream; @@ -173,6 +176,8 @@ public boolean offer(E e) private CannedAccessControlList cannedAcl; + private List tags; + /** * Creates a s3 uploader output stream * @param s3 The S3 client @@ -198,6 +203,7 @@ public S3OutputStream(final AmazonS3 s3, S3UploadRequest request) { this.storageClass = request.getStorageClass(); this.request = request; this.chunkSize = request.getChunkSize(); + this.tags = request.getTags(); } private ByteBuffer expandBuffer(ByteBuffer byteBuffer) { @@ -566,6 +572,10 @@ private void putObject(final InputStream content, final long contentLength, byte request.setStorageClass(storageClass); } + if( tags!=null && tags.size()>0 ) { + request.setTagging( new ObjectTagging(tags) ); + } + try { s3.putObject(request); } catch (final AmazonClientException e) { diff --git a/plugins/nf-amazon/src/main/com/upplication/s3fs/S3Path.java b/plugins/nf-amazon/src/main/com/upplication/s3fs/S3Path.java index bfbaa9743f..9c9f162dde 100644 --- a/plugins/nf-amazon/src/main/com/upplication/s3fs/S3Path.java +++ b/plugins/nf-amazon/src/main/com/upplication/s3fs/S3Path.java @@ -50,12 +50,15 @@ import java.nio.file.WatchKey; import java.nio.file.WatchService; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import com.amazonaws.services.s3.model.S3ObjectId; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.Tag; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -63,12 +66,13 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import nextflow.file.TagAwareFile; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; import static java.lang.String.format; -public class S3Path implements Path { +public class S3Path implements Path, TagAwareFile { public static final String PATH_SEPARATOR = "/"; /** @@ -86,6 +90,8 @@ public class S3Path implements Path { private S3ObjectSummary objectSummary; + private Map tags; + /** * path must be a string of the form "/{bucket}", "/{bucket}/{key}" or just * "{key}". @@ -537,6 +543,23 @@ void setObjectSummary(S3ObjectSummary objectSummary) { this.objectSummary = objectSummary; } + @Override + public void setTags(Map tags) { + this.tags = tags; + } + + public List getTagsList() { + // nothing found, just return + if( tags==null ) + return Collections.emptyList(); + // create a list of Tag out of the Map + List result = new ArrayList<>(); + for( Map.Entry entry : tags.entrySet()) { + result.add( new Tag(entry.getKey(), entry.getValue()) ); + } + return result; + } + // ~ helpers methods private static Function strip(final String ... strs) { diff --git a/plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3UploadRequest.java b/plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3UploadRequest.java index a9260f3182..c336b4f27a 100644 --- a/plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3UploadRequest.java +++ b/plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3UploadRequest.java @@ -17,11 +17,13 @@ package com.upplication.s3fs.util; +import java.util.List; import java.util.Properties; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3ObjectId; import com.amazonaws.services.s3.model.StorageClass; +import com.amazonaws.services.s3.model.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +49,7 @@ public class S3UploadRequest extends S3MultipartOptions { */ private ObjectMetadata metadata; - + private List tags; public S3UploadRequest() { @@ -115,6 +117,15 @@ public S3UploadRequest setMetadata(ObjectMetadata metadata) { return this; } + public List getTags() { + return tags; + } + + public S3UploadRequest setTags(List tags) { + this.tags = tags; + return this; + } + public String toString() { return "objectId=" + objectId + "storageClass=" + storageClass + diff --git a/plugins/nf-amazon/src/test/com/upplication/s3fs/AwsS3NioTest.groovy b/plugins/nf-amazon/src/test/com/upplication/s3fs/AwsS3NioTest.groovy index 49eaf39efb..bb5bd56af6 100644 --- a/plugins/nf-amazon/src/test/com/upplication/s3fs/AwsS3NioTest.groovy +++ b/plugins/nf-amazon/src/test/com/upplication/s3fs/AwsS3NioTest.groovy @@ -907,6 +907,39 @@ class AwsS3NioTest extends Specification implements AwsS3BaseSpec { cleanup: deleteBucket(bucketName) } - + + def 'should tag a file' () { + given: + def bucketName = createBucket() + and: + def path = (S3Path) Paths.get(new URI("s3:///$bucketName/alpha.txt")) + def copy = (S3Path) Paths.get(new URI("s3:///$bucketName/omega.txt")) + and: + def client = path.getFileSystem().getClient() + + when: + path.setTags(FOO: 'Hello world', BAR: 'xyz') + Files.createFile(path) + then: + Files.exists(path) + and: + def tags = client .getObjectTags(path.getBucket(), path.getKey()) + tags.find { it.key=='FOO' }.value == 'Hello world' + tags.find { it.key=='BAR' }.value == 'xyz' + + when: + copy.setTags(FOO: 'Hola mundo', BAZ: '123') + Files.copy(path, copy) + then: + Files.exists(copy) + and: + def copyTags = client .getObjectTags(copy.getBucket(), copy.getKey()) + copyTags.find { it.key=='FOO' }.value == 'Hola mundo' + copyTags.find { it.key=='BAZ' }.value == '123' + copyTags.find { it.key=='BAR' } == null + + cleanup: + deleteBucket(bucketName) + } } diff --git a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy index e7eeb84311..af9f7ff629 100644 --- a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy +++ b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy @@ -17,6 +17,9 @@ package nextflow.processor +import java.nio.file.Files + +import com.upplication.s3fs.S3Path import spock.lang.Specification import java.nio.file.FileSystems @@ -44,5 +47,31 @@ class PublishDirS3Test extends Specification { publisher.mode == PublishDir.Mode.COPY } + def 'should tag files' () { + + given: + def folder = Files.createTempDirectory('test') + def source = folder.resolve('hello.txt'); source.text = 'Hello' + + and: + def processor = [:] as TaskProcessor + processor.name = 'foo' + and: + def targetDir = FileHelper.asPath( 's3://bucket/work' ) + def publisher = new PublishDir(tags: [FOO:'this',BAR:'that'], path: targetDir, sourceFileSystem: FileSystems.default) + def spy = Spy(publisher) + + when: + spy.apply1(source, true) + then: + 1 * spy.safeProcessFile(source, _) >> { sourceFile, s3File -> + assert s3File instanceof S3Path + assert (s3File as S3Path).getTagsList().find{ it.getKey()=='FOO'}.value == 'this' + assert (s3File as S3Path).getTagsList().find{ it.getKey()=='BAR'}.value == 'that' + } + + cleanup: + folder?.deleteDir() + } }