Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add uploadS3Key activation parameter to specify upload file name #82

Merged
merged 5 commits into from
Mar 28, 2024

Conversation

vqminh
Copy link
Collaborator

@vqminh vqminh commented Mar 26, 2024

Dear DIL maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

  • My PR addresses the following JIRA issues

Description

  • Currently, data-integration-library does not support specifying the file name (s3key) to upload to but just use the file name of the original file. As the result all folder structure of the source are not maintained, and all files are uploaded to the same folder. We need to change data-integration-library and configure the upload job to do this.

Tests

  • This change create new parameter for backward compatibility.

build and run on holdem (snapshot cdi-core and dil-internal)

https://ltx1-holdemaz05.grid.linkedin.com:8443/executor?execid=102922952&job=uploadMatchedIds_uploadMatchedIdsFileDump&attempt=0

we can see it use: 0.2.112-SNAPSHOT
27-03-2024 12:11:29 PDT uploadMatchedIds_preprocess INFO - -rw-rw-r-- 1 root root 331394 Mar 27 19:11 /export/apps/azkaban/azkaban-exec-server/current/project/lib/cdi-core-0.2.112-SNAPSHOT.jar

27-03-2024 12:42:08 PDT uploadMatchedIds_uploadMatchedIdsFileDump INFO - INFO [MultistageSource] Generating Work Unit: [watermark.system.1711350000000, watermark.activation.{"pathName":"hdfs://ltx1-holdemnn01.grid.linkedin.com:9000/jobs/exttest/acxiom/uploadMatchedIds/encrypted/20240301/FR/part-00000-dd777519-b0cb-4085-86b9-dfb2b811bee1.c000.csv.gpg","uploadS3Key":"20240301/FR/part-00000-dd777519-b0cb-4085-86b9-dfb2b811bee1.c000.csv.gpg"}], watermark: (1711350000000,1711522800000)

27-03-2024 13:11:03 PDT uploadMatchedIds_uploadMatchedIdsFileDump INFO - INFO [S3Connection] writing to bucket bizo-partner-transfer and key partners/acxiom/matched-ids/new/20240325/20240301/FR/part-00006-dd777519-b0cb-4085-86b9-dfb2b811bee1.c000.csv.gpg

we can see that the key is what we specified in uploadS3Key

and the data in s3 is organized:

https://s3.console.aws.amazon.com/s3/buckets/bizo-partner-transfer?region=us-east-1&bucketType=general&prefix=partners/acxiom/matched-ids/new/20240325/20240301/&showversions=false

Running without the config:

https://ltx1-holdemaz05.grid.linkedin.com:8443/executor?execid=102928464

Commits

  1. add uploadS3Key param
  2. upgrade to gradle 6 in order to apply plugin com.jfrog.artifactory

* Get s3 key either from activation parameters named `s3key` or from the source path itself
*/
@NotNull
private String getS3Key(Path path) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main logic on how we get s3key from the secondary input. The config will be:

[{"path": "${job.dir}/${preceding.table.name}", "fields":["pathName", "s3key"], "category": "activation"},{"path": "{{pathName}}", "category": "payload", "format":"binary"}]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be better way to do this like adding another similar "binary" payload for file name, or make the binary payload a json object with path and name. But it won't be backward compatible. I also try to keep all the parameters for upload (which is a special case in the code) close together. Currently, the "binary" payload is only relevant while uploading.

return path.getName();
}

private ByteArrayInputStream handleUpload(Path path, String fileName) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file name will be passed as parameter

if (shouldUpload) {
ByteArrayInputStream byteArrayInputStream = handleUpload(finalPrefix);
Path path = new Path(pathStr);
String fileName = finalPrefix + "/" + getS3Key(path);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3 key will be determined from the activation parameters

@@ -84,43 +80,47 @@ public S3Connection(State state, JobKeys jobKeys, ExtractorKeys extractorKeys) {
public WorkUnitStatus execute(WorkUnitStatus status) {
s3Client = getS3HttpClient(getState());

String finalPrefix = getWorkUnitSpecificString(s3SourceV2Keys.getPrefix(), getExtractorKeys().getDynamicParameters());
JsonObject dynamicParameters = getExtractorKeys().getDynamicParameters();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after running ./gradlew build the file is auto formatted. I extract some variable to improve readability.

@NotNull
private String getS3Key(Path path) {
JsonObject activationParameters = getExtractorKeys().getActivationParameters();
for (Map.Entry<String, JsonElement> entry : activationParameters.entrySet()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use "has" and "getAsJsonPrimitive" method here
https://www.javadoc.io/doc/com.google.code.gson/gson/2.8.5/com/google/gson/JsonObject.html
instead of using the loop?
And make sure that if the custom key doesn't exist return the original path.getName()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. It's already return path.getName() by default, I change to

@NotNull
private String getS3Key(Path path) {
JsonObject activationParameters = getExtractorKeys().getActivationParameters();
if(activationParameters.has(UPLOAD_S3_KEY)){
return activationParameters.get(UPLOAD_S3_KEY).getAsString();
}
return path.getName();
}

Copy link
Collaborator

@dihu-linkedin dihu-linkedin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you run your flow successfully with this new change? If so please share the job execution link separately in the ask channel for verification

@@ -59,6 +54,7 @@
*/
public class S3Connection extends MultistageConnection {
private static final Logger LOG = LoggerFactory.getLogger(S3Connection.class);
private static final String UPLOAD_S3_KEY = "s3key";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a more descriptive and unique name here. s3key is very general and could conflict with other use cases which happen to use the same name

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only for this context though, how about ms.target.s3.key? similar to ms.extractor.target.file.name

Copy link
Collaborator Author

@vqminh vqminh Mar 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change to uploadS3Key, this won't conflict because it's in the context of the upload secondary input only. Not like other parameter

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah uploadS3Key is fine. If you use s3key, it is used by other flows as well so other flow may end up enabling this feature unexpectedly

@vqminh
Copy link
Collaborator Author

vqminh commented Mar 26, 2024

Have you run your flow successfully with this new change? If so please share the job execution link separately in the ask channel for verification

How can I run this with new change? public snapshot? Will try

@vqminh vqminh changed the title add s3key parameter to specify upload file name add upload.s3.key activation parameter to specify upload file name Mar 27, 2024
@vqminh vqminh changed the title add upload.s3.key activation parameter to specify upload file name add uploadS3Key activation parameter to specify upload file name Mar 27, 2024
@vqminh
Copy link
Collaborator Author

vqminh commented Mar 27, 2024

Have you run your flow successfully with this new change? If so please share the job execution link separately in the ask channel for verification

I tested on holdem with snapshot build and it works

Copy link
Collaborator

@dihu-linkedin dihu-linkedin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change lgtm. Please get another approval from dil oncall

@dihu-linkedin dihu-linkedin merged commit a764088 into linkedin:master Mar 28, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants