From 1c599d3fbfcd12b82a6b93f48c43baf25fa05133 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 7 Aug 2024 14:50:48 -0400 Subject: [PATCH] Handle reserved characters (colon, dot, white space) in Lineage FQN --- .../org/apache/beam/sdk/io/FileSystem.java | 2 +- .../org/apache/beam/sdk/io/FileSystems.java | 4 +- .../org/apache/beam/sdk/metrics/Lineage.java | 104 +++++++++++++++--- .../apache/beam/sdk/metrics/LineageTest.java | 59 ++++++++++ .../extensions/gcp/storage/GcsFileSystem.java | 4 +- .../beam/sdk/io/aws/s3/S3FileSystem.java | 4 +- .../beam/sdk/io/aws2/s3/S3FileSystem.java | 4 +- .../blobstore/AzureBlobStoreFileSystem.java | 14 +-- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 5 +- .../io/gcp/bigquery/BigQuerySourceBase.java | 3 +- .../bigquery/BigQueryStorageSourceBase.java | 7 +- .../sdk/io/gcp/bigquery/CreateTables.java | 3 +- .../StorageApiWriteUnshardedRecords.java | 3 +- .../StorageApiWritesShardedRecords.java | 3 +- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 3 +- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 3 +- .../io/gcp/bigtable/BigtableServiceImpl.java | 6 +- .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 15 +-- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 7 +- .../io/gcp/pubsub/PubsubUnboundedSink.java | 7 +- .../io/gcp/pubsub/PubsubUnboundedSource.java | 4 +- .../sdk/io/gcp/bigtable/BigtableReadIT.java | 5 +- .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 4 +- .../sdk/io/gcp/pubsub/PubsubClientTest.java | 5 +- .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 4 +- 25 files changed, 212 insertions(+), 70 deletions(-) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java index 38f51ce9229f1..11314a318b256 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java @@ -162,5 +162,5 @@ protected abstract void rename( * *

Unless override by FileSystem implementations, default to no-op. */ - protected void reportLineage(ResourceIdT unusedId, Lineage.Type unusedType) {} + protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage) {} } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 1e7e81f8af51e..a4ca9b80dce37 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -398,12 +398,12 @@ public ResourceId apply(@Nonnull Metadata input) { /** Report source {@link Lineage} metrics for resource id. */ public static void reportSourceLineage(ResourceId resourceId) { - getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.Type.SOURCE); + getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.getSources()); } /** Report sink {@link Lineage} metrics for resource id. */ public static void reportSinkLineage(ResourceId resourceId) { - getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.Type.SINK); + getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.getSinks()); } private static class FilterResult { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index f0f9055b77643..e9acb20422f27 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -19,36 +19,108 @@ import java.util.HashSet; import java.util.Set; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Standard collection of metrics used to record source and sinks information for lineage tracking. */ public class Lineage { + public static final String LINEAGE_NAMESPACE = "lineage"; - private static final StringSet SOURCES = - Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString()); - private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, Type.SINK.toString()); + private static final Lineage SOURCES = new Lineage(Type.SOURCE); + private static final Lineage SINKS = new Lineage(Type.SINK); + + private final StringSet metric; + + private Lineage(Type type) { + this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString()); + } - /** {@link StringSet} representing sources and optionally side inputs. */ - public static StringSet getSources() { + /** {@link Lineage} representing sources and optionally side inputs. */ + public static Lineage getSources() { return SOURCES; } - /** {@link StringSet} representing sinks. */ - public static StringSet getSinks() { + /** {@link Lineage} representing sinks. */ + public static Lineage getSinks() { return SINKS; } - /** {@link StringSet} representing {@link Type}. */ - public static StringSet get(Type type) { - switch (type) { - case SOURCE: - return getSources(); - case SINK: - return getSinks(); - default: - throw new IllegalArgumentException(String.format("Unsupported Lineage type: %s", type)); + private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.]"); + + /** + * Wrap segment to valid segment name. + * + *

Specifically, If there are reserved chars (colon, whitespace, dot), escape with backtick. If + * the segment is already wrapped, return the original. + */ + private static String wrapSegment(String value) { + if (value.startsWith("`") && value.endsWith("`")) { + return value; + } + if (RESERVED_CHARS.matcher(value).find()) { + return String.format("`%s`", value); + } + return value; + } + + /** + * Assemble fully qualified name (FQN). Format: + * + *

+ * + *

This helper method is for internal and testing usage only. + */ + @Internal + public static String getFqName( + String system, @Nullable String routine, Iterable segments) { + StringBuilder builder = new StringBuilder(system); + if (!Strings.isNullOrEmpty(routine)) { + builder.append(":").append(routine); + } + int idx = 0; + for (String segment : segments) { + if (idx == 0) { + builder.append(":"); + } else { + builder.append("."); + } + builder.append(wrapSegment(segment)); + ++idx; } + return builder.toString(); + } + + /** + * Assemble the FQN of given system, and segments. + * + *

This helper method is for internal and testing usage only. + */ + @Internal + public static String getFqName(String system, Iterable segments) { + return getFqName(system, null, segments); + } + + /** + * Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link #getFqName}. + */ + public void add(String system, @Nullable String routine, Iterable segments) { + metric.add(getFqName(system, routine, segments)); + } + + /** + * Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link #getFqName}. + */ + public void add(String system, Iterable segments) { + add(system, null, segments); } /** Query {@link StringSet} metrics from {@link MetricResults}. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java new file mode 100644 index 0000000000000..432eb396fe20c --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link Lineage}. */ +@RunWith(JUnit4.class) +public class LineageTest { + @Test + public void testGetFqName() { + Map testCases = + ImmutableMap.builder() + .put("apache-beam", "apache-beam") + .put("`apache-beam`", "`apache-beam`") + .put("apache.beam", "`apache.beam`") + .put("apache:beam", "`apache:beam`") + .put("apache beam", "`apache beam`") + .put("`apache beam`", "`apache beam`") + .put("apache\tbeam", "`apache\tbeam`") + .put("apache\nbeam", "`apache\nbeam`") + .build(); + testCases.forEach( + (key, value) -> + assertEquals("apache:" + value, Lineage.getFqName("apache", ImmutableList.of(key)))); + testCases.forEach( + (key, value) -> + assertEquals( + "apache:beam:" + value, + Lineage.getFqName("apache", "beam", ImmutableList.of(key)))); + testCases.forEach( + (key, value) -> + assertEquals( + "apache:beam:" + value + "." + value, + Lineage.getFqName("apache", "beam", ImmutableList.of(key, key)))); + } +} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java index 596b20beb83d6..6332051c0ddc7 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java @@ -216,10 +216,10 @@ protected String getScheme() { } @Override - protected void reportLineage(GcsResourceId resourceId, Lineage.Type type) { + protected void reportLineage(GcsResourceId resourceId, Lineage lineage) { GcsPath path = resourceId.getGcsPath(); if (!path.getBucket().isEmpty()) { - Lineage.get(type).add(String.format("gcs:%s.%s", path.getBucket(), path.getObject())); + lineage.add("gcs", ImmutableList.of(path.getBucket(), path.getObject())); } else { LOG.warn("Report Lineage on relative path {} is unsupported", path.getObject()); } diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java index 9b234865eb918..7ed56efa44bda 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java @@ -626,8 +626,8 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir } @Override - protected void reportLineage(S3ResourceId resourceId, Lineage.Type type) { - Lineage.get(type).add(String.format("s3:%s.%s", resourceId.getBucket(), resourceId.getKey())); + protected void reportLineage(S3ResourceId resourceId, Lineage lineage) { + lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey())); } /** diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java index cf47133e98ea1..384c8c627ee7f 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java @@ -657,8 +657,8 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir } @Override - protected void reportLineage(S3ResourceId resourceId, Lineage.Type type) { - Lineage.get(type).add(String.format("s3:%s.%s", resourceId.getBucket(), resourceId.getKey())); + protected void reportLineage(S3ResourceId resourceId, Lineage lineage) { + lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey())); } /** diff --git a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java index 8a881cead7f17..5137eaf9bb2dc 100644 --- a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java +++ b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java @@ -452,16 +452,14 @@ protected AzfsResourceId matchNewResource(String singleResourceSpec, boolean isD } @Override - protected void reportLineage(AzfsResourceId resourceId, Lineage.Type type) { + protected void reportLineage(AzfsResourceId resourceId, Lineage lineage) { if (!Strings.isNullOrEmpty(resourceId.getBlob())) { - Lineage.get(type) - .add( - String.format( - "abs:%s.%s.%s", - resourceId.getAccount(), resourceId.getContainer(), resourceId.getBlob())); + lineage.add( + "abs", + ImmutableList.of( + resourceId.getAccount(), resourceId.getContainer(), resourceId.getBlob())); } else { - Lineage.get(type) - .add(String.format("abs:%s.%s", resourceId.getAccount(), resourceId.getContainer())); + lineage.add("abs", ImmutableList.of(resourceId.getAccount(), resourceId.getContainer())); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 61bed66a3365a..129c8314fc80f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -55,6 +55,7 @@ import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -413,7 +414,7 @@ public static String toTableSpec(TableReference ref) { return sb.toString(); } - public static String dataCatalogName(TableReference ref, BigQueryOptions options) { + public static List dataCatalogSegments(TableReference ref, BigQueryOptions options) { String tableIdBase; int ix = ref.getTableId().indexOf('$'); if (ix == -1) { @@ -429,7 +430,7 @@ public static String dataCatalogName(TableReference ref, BigQueryOptions options } else { projectId = options.getProject(); } - return String.format("bigquery:%s.%s.%s", projectId, ref.getDatasetId(), tableIdBase); + return ImmutableList.of(projectId, ref.getDatasetId(), tableIdBase); } static List getOrCreateMapListValue(Map> map, K key) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 998c82ab8d833..a8985775cbe7d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -121,7 +121,8 @@ protected ExtractResult extractFiles(PipelineOptions options) throws Exception { BigQueryHelpers.toTableSpec(tableToExtract))); } // emit this table ID as a lineage source - Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract, bqOptions)); + Lineage.getSources() + .add("bigquery", BigQueryHelpers.dataCatalogSegments(tableToExtract, bqOptions)); TableSchema schema = table.getSchema(); JobService jobService = bqServices.getJobService(bqOptions); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java index 3852d18ec12d4..51a5a8f391a64 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -109,12 +108,12 @@ public List> split( @Nullable Table targetTable = getTargetTable(bqOptions); ReadSession.Builder readSessionBuilder = ReadSession.newBuilder(); - StringSet lineageSources = Lineage.getSources(); + Lineage lineage = Lineage.getSources(); if (targetTable != null) { TableReference tableReference = targetTable.getTableReference(); readSessionBuilder.setTable(BigQueryHelpers.toTableResourceName(tableReference)); // register the table as lineage source - lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, bqOptions)); + lineage.add("bigquery", BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions)); } else { // If the table does not exist targetTable will be null. // Construct the table id if we can generate it. For error recording/logging. @@ -123,7 +122,7 @@ public List> split( readSessionBuilder.setTable(tableReferenceId); // register the table as lineage source TableReference tableReference = BigQueryHelpers.parseTableUrn(tableReferenceId); - lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, bqOptions)); + lineage.add("bigquery", BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions)); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java index a55bcc3fe0252..1bbd4e7560841 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -121,7 +121,8 @@ public void processElement(ProcessContext context) { BigQueryOptions bqOptions = context.getPipelineOptions().as(BigQueryOptions.class); Lineage.getSinks() .add( - BigQueryHelpers.dataCatalogName( + "bigquery", + BigQueryHelpers.dataCatalogSegments( tableDestination1.getTableReference(), bqOptions)); return CreateTableHelpers.possiblyCreateTable( bqOptions, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 8a902ec6d264e..0671c6da3c61d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -1109,7 +1109,8 @@ public void process( pipelineOptions.as(BigQueryOptions.class))); Lineage.getSinks() .add( - BigQueryHelpers.dataCatalogName( + "bigquery", + BigQueryHelpers.dataCatalogSegments( state.getTableDestination().getTableReference(), pipelineOptions.as(BigQueryOptions.class))); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index a7da19a75f850..05708f3e6abee 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -472,7 +472,8 @@ public void process( Lineage.getSinks() .add( - BigQueryHelpers.dataCatalogName( + "bigquery", + BigQueryHelpers.dataCatalogSegments( tableDestination.getTableReference(), bigQueryOptions)); Coder destinationCoder = dynamicDestinations.getDestinationCoder(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 1a6a6a4db70d2..061e66024e297 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -210,7 +210,8 @@ public void processElement( if (!entry.getValue().isEmpty()) { Lineage.getSinks() .add( - BigQueryHelpers.dataCatalogName( + "bigquery", + BigQueryHelpers.dataCatalogSegments( entry.getKey().getTableReference(), c.getPipelineOptions().as(BigQueryOptions.class))); pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c, window)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index ace0bc5a74cd5..e374d459af44f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -263,7 +263,8 @@ public void processElement( } else { Lineage.getSinks() .add( - BigQueryHelpers.dataCatalogName( + "bigquery", + BigQueryHelpers.dataCatalogSegments( tableReference, c.getPipelineOptions().as(BigQueryOptions.class))); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 6fdf67722bac2..1af9ae4f932dd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -216,7 +216,7 @@ public void close() { @Override public void reportLineage() { - Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + Lineage.getSources().add("bigtable", ImmutableList.of(projectId, instanceId, tableId)); } } @@ -327,7 +327,7 @@ public void close() {} @Override public void reportLineage() { - Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + Lineage.getSources().add("bigtable", ImmutableList.of(projectId, instanceId, tableId)); } @Override @@ -597,7 +597,7 @@ public void writeSingleRecord(KV> record) throws @Override public void reportLineage() { - Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + Lineage.getSinks().add("bigtable", ImmutableList.of(projectId, instanceId, tableId)); } private ServiceCallMetric createServiceCallMetric() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index f66ee6e1d8425..2964a29dbb6b6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -37,6 +37,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -260,8 +261,8 @@ public String getFullPath() { return String.format("/subscriptions/%s/%s", projectId, subscriptionName); } - public String getDataCatalogName() { - return String.format("pubsub:subscription:%s.%s", projectId, subscriptionName); + public List getDataCatalogSegments() { + return ImmutableList.of(projectId, subscriptionName); } @Override @@ -319,14 +320,14 @@ public String getName() { } /** - * Returns the data catalog name. Format "pubsub:topic:`project`.`topic`" This method is - * fail-safe. If topic path is malformed, it returns an empty string. + * Returns the data catalog segments. This method is fail-safe. If topic path is malformed, it + * returns an empty string. */ - public String getDataCatalogName() { + public List getDataCatalogSegments() { List splits = Splitter.on('/').splitToList(path); if (splits.size() == 4) { // well-formed path - return String.format("pubsub:topic:%s.%s", splits.get(1), splits.get(3)); + return ImmutableList.of(splits.get(1), splits.get(3)); } else { // Mal-formed path. It is either a test fixture or user error and will fail on publish. // We do not throw exception instead return empty string here. @@ -334,7 +335,7 @@ public String getDataCatalogName() { "Cannot get data catalog name for malformed topic path {}. Expected format: " + "projects//topics/", path); - return ""; + return ImmutableList.of(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 0fd4e9207d81a..8b582c1054f8d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -85,6 +85,7 @@ import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -513,8 +514,8 @@ public String asPath() { } } - public String dataCatalogName() { - return String.format("pubsub:topic:%s.%s", project, topic); + public List dataCatalogSegments() { + return ImmutableList.of(project, topic); } @Override @@ -1624,7 +1625,7 @@ public void finishBundle() throws IOException { } // Report lineage for all topics seen for (PubsubTopic topic : output.keySet()) { - Lineage.getSinks().add(topic.dataCatalogName()); + Lineage.getSinks().add("pubsub", "topic", topic.dataCatalogSegments()); } output = null; pubsubClient.close(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index defea87e835a8..38d77aa3aac38 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -70,7 +70,6 @@ import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -297,9 +296,9 @@ private void publishBatch(List messages, int bytes) throws IOEx byteCounter.inc(bytes); // Report Lineage multiple once for same topic if (!topicPath.equals(reportedLineage)) { - String name = topicPath.getDataCatalogName(); - if (!Strings.isNullOrEmpty(name)) { - Lineage.getSinks().add(topicPath.getDataCatalogName()); + List segments = topicPath.getDataCatalogSegments(); + if (segments.size() != 0) { + Lineage.getSinks().add("pubsub", "topic", segments); } reportedLineage = topicPath; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index b131b521c067e..95fa5c223419f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -1045,14 +1045,14 @@ public List split(int desiredNumSplits, PipelineOptions options) TopicPath topic = outer.getTopic(); if (topic != null) { // is initial split on Read.fromTopic, report Lineage based on topic - Lineage.getSources().add(topic.getDataCatalogName()); + Lineage.getSources().add("pubsub", "source", topic.getDataCatalogSegments()); } } else { if (subscriptionPath.equals(outer.getSubscriptionProvider())) { SubscriptionPath sub = subscriptionPath.get(); if (sub != null) { // is a split on Read.fromSubscription - Lineage.getSources().add(sub.getDataCatalogName()); + Lineage.getSources().add("pubsub", "subscription", sub.getDataCatalogSegments()); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java index 4ce9ad10b2c06..4faff5ad6bd29 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -154,7 +155,9 @@ private void checkLineageSourceMetric(PipelineResult r, String tableId) { if (options.getRunner().getName().contains("DirectRunner")) { assertThat( Lineage.query(r.metrics(), Lineage.Type.SOURCE), - hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId))); + hasItem( + Lineage.getFqName( + "bigtable", ImmutableList.of(project, options.getInstanceId(), tableId)))); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java index 46bb3df836e56..44f3e5a199239 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java @@ -423,7 +423,9 @@ private void checkLineageSinkMetric(PipelineResult r, String tableId) { if (options.getRunner().getName().contains("DirectRunner")) { assertThat( Lineage.query(r.metrics(), Lineage.Type.SINK), - hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId))); + hasItem( + Lineage.getFqName( + "bigtable", ImmutableList.of(project, options.getInstanceId(), tableId)))); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java index fb007d1171db1..9d7bc65f59545 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Instant; import org.junit.Rule; @@ -171,7 +172,7 @@ public void subscriptionPathFromNameWellFormed() { SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something"); assertEquals("projects/test/subscriptions/something", path.getPath()); assertEquals("/subscriptions/test/something", path.getFullPath()); - assertEquals("pubsub:subscription:test.something", path.getDataCatalogName()); + assertEquals(ImmutableList.of("test", "something"), path.getDataCatalogSegments()); } @Test @@ -179,7 +180,7 @@ public void topicPathFromNameWellFormed() { TopicPath path = PubsubClient.topicPathFromName("test", "something"); assertEquals("projects/test/topics/something", path.getPath()); assertEquals("/topics/test/something", path.getFullPath()); - assertEquals("pubsub:topic:test.something", path.getDataCatalogName()); + assertEquals(ImmutableList.of("test", "something"), path.getDataCatalogSegments()); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index 74a98f0b8b438..d4effbae40a4c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -238,8 +238,8 @@ public void testValueProviderTopic() { assertThat(pubsubRead.getTopicProvider().isAccessible(), is(true)); assertThat(pubsubRead.getTopicProvider().get().asPath(), equalTo(provider.get())); assertThat( - pubsubRead.getTopicProvider().get().dataCatalogName(), - equalTo("pubsub:topic:project.topic")); + pubsubRead.getTopicProvider().get().dataCatalogSegments(), + equalTo(ImmutableList.of("project", "topic"))); } @Test