Skip to content

Commit

Permalink
Add tags attribute to publishDir directive
Browse files Browse the repository at this point in the history
This commit adds the abiltity to associate arbitrary tags to
files created by the publishDir directory. Tags a can be provided
with a associated array e.g.

  publishDir some_path, tags: [FOO: 'this', BAR: 'bar']

or can be resolved dynamically using a closure returning the desidered
associative array representing the path.

This feature requires the target file system supporting the ability to
associate tags to files. Currently only supported by AWS S3.
  • Loading branch information
pditommaso committed Dec 20, 2021
1 parent 1202050 commit e78a997
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 16 deletions.
1 change: 1 addition & 0 deletions docs/process.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,7 @@ saveAs A closure which, given the name of the file being published, ret
Return the value ``null`` from the closure to *not* publish a file.
This is useful when the process has multiple output files, but you want to publish only some of them.
enabled Enable or disable the publish rule depending on the boolean value specified (default: ``true``).
tags Allow to associate tags with the target file e.g. ``tag: [FOO: 'Hello world']`` (EXPERIMENTAL, currently only supported by files stored on AWS S3, requires version ``21.12.0-edge`` or later).
=============== =================

Table of publish modes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import nextflow.Global
import nextflow.Session
import nextflow.extension.FilesEx
import nextflow.file.FileHelper
import nextflow.file.TagAwareFile
import nextflow.util.PathTrie
/**
* Implements the {@code publishDir} directory. It create links or copies the output
Expand Down Expand Up @@ -84,6 +85,11 @@ class PublishDir {
*/
boolean enabled = true

/**
* Tags to be associated to the target file
*/
private def tags

private PathMatcher matcher

private FileSystem sourceFileSystem
Expand Down Expand Up @@ -120,6 +126,17 @@ class PublishDir {
this.mode = mode
}

static @PackageScope Map<String,String> resolveTags( tags ) {
def result = tags instanceof Closure
? tags.call()
: tags

if( result instanceof Map<String,String> )
return result

throw new IllegalArgumentException("Invalid publishDir tags attribute: $tags")
}

@PackageScope boolean checkNull(String str) {
( str =~ /\bnull\b/ ).find()
}
Expand Down Expand Up @@ -152,11 +169,14 @@ class PublishDir {
result.overwrite = Boolean.parseBoolean(params.overwrite.toString())

if( params.saveAs )
result.saveAs = params.saveAs
result.saveAs = (Closure) params.saveAs

if( params.enabled != null )
result.enabled = Boolean.parseBoolean(params.enabled.toString())

if( params.tags != null )
result.tags = params.tags

return result
}

Expand Down Expand Up @@ -217,6 +237,7 @@ class PublishDir {
this.sourceFileSystem = sourceDir ? sourceDir.fileSystem : null
apply0(files)
}

/**
* Apply the publishing process to the specified {@link TaskRun} instance
*
Expand Down Expand Up @@ -258,6 +279,12 @@ class PublishDir {
}

final destination = resolveDestination(target)

// apply tags
if( this.tags!=null && destination instanceof TagAwareFile ) {
destination.setTags( resolveTags(this.tags) )
}

if( inProcess ) {
safeProcessFile(source, destination)
}
Expand Down
29 changes: 29 additions & 0 deletions modules/nf-commons/src/main/nextflow/file/TagAwareFile.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2020-2021, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.file

/**
* Defines the interface for annotate a file with one ore more tags
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
interface TagAwareFile {

void setTags(Map<String,String> tags)

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,19 @@
import com.amazonaws.services.s3.model.CopyObjectResult;
import com.amazonaws.services.s3.model.CopyPartRequest;
import com.amazonaws.services.s3.model.CopyPartResult;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.ObjectTagging;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.Tag;
import com.upplication.s3fs.util.S3MultipartOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -142,15 +145,19 @@ public PutObjectResult putObject(String bucket, String key, File file) {
}
return client.putObject(req);
}

/**
* @see com.amazonaws.services.s3.AmazonS3Client#putObject(String, String, java.io.InputStream, ObjectMetadata)
*/
public PutObjectResult putObject(String bucket, String keyName, InputStream inputStream, ObjectMetadata metadata) {
public PutObjectResult putObject(String bucket, String keyName, InputStream inputStream, ObjectMetadata metadata, List<Tag> tags) {
PutObjectRequest req = new PutObjectRequest(bucket, keyName, inputStream, metadata);
if( cannedAcl != null ) {
log.trace("Setting canned ACL={}; bucket={} and stream", cannedAcl, bucket);
log.trace("Setting canned ACL={}; bucket={}; tags={} and stream", cannedAcl, bucket, tags);
req.withCannedAcl(cannedAcl);
}
if( tags != null ) {
req.setTagging(new ObjectTagging(tags));
}
return client.putObject(req);
}
/**
Expand Down Expand Up @@ -236,14 +243,18 @@ public ObjectMetadata getObjectMetadata(String bucketName, String key) {
return client.getObjectMetadata(bucketName, key);
}

/**
public List<Tag> getObjectTags(String bucketName, String key) {
return client.getObjectTagging(new GetObjectTaggingRequest(bucketName,key)).getTagSet();
}

/**
* @see com.amazonaws.services.s3.AmazonS3Client#listNextBatchOfObjects(com.amazonaws.services.s3.model.ObjectListing)
*/
public ObjectListing listNextBatchOfObjects(ObjectListing objectListing) {
return client.listNextBatchOfObjects(objectListing);
}

public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSize, S3MultipartOptions opts, ObjectMetadata targetObjectMetadata ) {
public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSize, S3MultipartOptions opts, ObjectMetadata targetObjectMetadata, List<Tag> tags ) {

final String sourceBucketName = s3Source.getBucket();
final String sourceObjectKey = s3Source.getKey();
Expand All @@ -262,6 +273,10 @@ public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSiz
initiateRequest.withObjectMetadata(targetObjectMetadata);
}

if( tags != null && tags.size()>0 ) {
initiateRequest.setTagging( new ObjectTagging(tags));
}

InitiateMultipartUploadResult initResult = client.initiateMultipartUpload(initiateRequest);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand All @@ -91,9 +92,11 @@
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.ObjectTagging;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.Tag;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -354,7 +357,7 @@ private S3OutputStream createUploaderOutputStream( S3Path fileToUpload ) {

S3UploadRequest req = props != null ? new S3UploadRequest(props) : new S3UploadRequest();
req.setObjectId(fileToUpload.toS3ObjectId());

req.setTags(fileToUpload.getTagsList());
S3OutputStream stream = new S3OutputStream(s3.getClient(), req);
stream.setCannedAcl(s3.getCannedAcl());
return stream;
Expand Down Expand Up @@ -398,6 +401,7 @@ public SeekableByteChannel newByteChannel(Path path,

// and we can use the File SeekableByteChannel implementation
final SeekableByteChannel seekable = Files .newByteChannel(tempFile, options);
final List<Tag> tags = ((S3Path) path).getTagsList();

return new SeekableByteChannel() {
@Override
Expand Down Expand Up @@ -431,7 +435,8 @@ public void close() throws IOException {
.getClient()
.putObject(s3Path.getBucket(), s3Path.getKey(),
stream,
metadata);
metadata,
tags);
}
}
else {
Expand Down Expand Up @@ -493,6 +498,7 @@ public void createDirectory(Path dir, FileAttribute<?>... attrs)
Preconditions.checkArgument(attrs.length == 0,
"attrs not yet supported: %s", ImmutableList.copyOf(attrs)); // TODO

List<Tag> tags = s3Path.getTagsList();
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
if( isAES256Enabled() )
Expand All @@ -504,7 +510,7 @@ public void createDirectory(Path dir, FileAttribute<?>... attrs)
s3Path.getFileSystem()
.getClient()
.putObject(s3Path.getBucket(), keyName,
new ByteArrayInputStream(new byte[0]), metadata);
new ByteArrayInputStream(new byte[0]), metadata, tags);
}

@Override
Expand Down Expand Up @@ -570,18 +576,22 @@ public void copy(Path source, Path target, CopyOption... options)
final S3MultipartOptions opts = props != null ? new S3MultipartOptions<>(props) : new S3MultipartOptions();
final int chunkSize = opts.getChunkSize();
final long length = sourceObjMetadata.getContentLength();

final List<Tag> tags = ((S3Path) target).getTagsList();

if( length <= chunkSize ) {

CopyObjectRequest copyObjRequest = new CopyObjectRequest(s3Source.getBucket(), s3Source.getKey(),s3Target.getBucket(), s3Target.getKey());

if( tags.size()>0 ) {
copyObjRequest.setNewObjectTagging(new ObjectTagging(tags));
}

ObjectMetadata targetObjectMetadata = null;
if( isAES256Enabled() ) {
targetObjectMetadata = new ObjectMetadata();
targetObjectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
copyObjRequest.setNewObjectMetadata(targetObjectMetadata);
}
log.trace("Copy file via copy object - source: source={}, target={}, metadata={}", s3Source, s3Target, targetObjectMetadata);
log.trace("Copy file via copy object - source: source={}, target={}, metadata={}, tags={}", s3Source, s3Target, targetObjectMetadata, tags);
client.copyObject(copyObjRequest);
}
else {
Expand All @@ -590,8 +600,8 @@ public void copy(Path source, Path target, CopyOption... options)
targetObjectMetadata = new ObjectMetadata();
targetObjectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
}
log.trace("Copy file via multipart upload - source: source={}, target={}, metadata={}", s3Source, s3Target, targetObjectMetadata);
client.multipartCopyObject(s3Source, s3Target, length, opts, targetObjectMetadata);
log.trace("Copy file via multipart upload - source: source={}, target={}, metadata={}, tags={}", s3Source, s3Target, targetObjectMetadata, tags);
client.multipartCopyObject(s3Source, s3Target, length, opts, targetObjectMetadata, tags);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -43,10 +44,12 @@
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.ObjectTagging;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectId;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.Tag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.util.Base64;
import com.upplication.s3fs.util.ByteBufferInputStream;
Expand Down Expand Up @@ -173,6 +176,8 @@ public boolean offer(E e)

private CannedAccessControlList cannedAcl;

private List<Tag> tags;

/**
* Creates a s3 uploader output stream
* @param s3 The S3 client
Expand All @@ -198,6 +203,7 @@ public S3OutputStream(final AmazonS3 s3, S3UploadRequest request) {
this.storageClass = request.getStorageClass();
this.request = request;
this.chunkSize = request.getChunkSize();
this.tags = request.getTags();
}

private ByteBuffer expandBuffer(ByteBuffer byteBuffer) {
Expand Down Expand Up @@ -566,6 +572,10 @@ private void putObject(final InputStream content, final long contentLength, byte
request.setStorageClass(storageClass);
}

if( tags!=null && tags.size()>0 ) {
request.setTagging( new ObjectTagging(tags) );
}

try {
s3.putObject(request);
} catch (final AmazonClientException e) {
Expand Down
Loading

0 comments on commit e78a997

Please sign in to comment.