diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
index 38f51ce9229f..11314a318b25 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -162,5 +162,5 @@ protected abstract void rename(
*
*
Unless override by FileSystem implementations, default to no-op.
*/
- protected void reportLineage(ResourceIdT unusedId, Lineage.Type unusedType) {}
+ protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage) {}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index 1e7e81f8af51..a4ca9b80dce3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -398,12 +398,12 @@ public ResourceId apply(@Nonnull Metadata input) {
/** Report source {@link Lineage} metrics for resource id. */
public static void reportSourceLineage(ResourceId resourceId) {
- getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.Type.SOURCE);
+ getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.getSources());
}
/** Report sink {@link Lineage} metrics for resource id. */
public static void reportSinkLineage(ResourceId resourceId) {
- getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.Type.SINK);
+ getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.getSinks());
}
private static class FilterResult {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
index f0f9055b7764..e9acb20422f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
@@ -19,36 +19,108 @@
import java.util.HashSet;
import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Standard collection of metrics used to record source and sinks information for lineage tracking.
*/
public class Lineage {
+
public static final String LINEAGE_NAMESPACE = "lineage";
- private static final StringSet SOURCES =
- Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString());
- private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, Type.SINK.toString());
+ private static final Lineage SOURCES = new Lineage(Type.SOURCE);
+ private static final Lineage SINKS = new Lineage(Type.SINK);
+
+ private final StringSet metric;
+
+ private Lineage(Type type) {
+ this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString());
+ }
- /** {@link StringSet} representing sources and optionally side inputs. */
- public static StringSet getSources() {
+ /** {@link Lineage} representing sources and optionally side inputs. */
+ public static Lineage getSources() {
return SOURCES;
}
- /** {@link StringSet} representing sinks. */
- public static StringSet getSinks() {
+ /** {@link Lineage} representing sinks. */
+ public static Lineage getSinks() {
return SINKS;
}
- /** {@link StringSet} representing {@link Type}. */
- public static StringSet get(Type type) {
- switch (type) {
- case SOURCE:
- return getSources();
- case SINK:
- return getSinks();
- default:
- throw new IllegalArgumentException(String.format("Unsupported Lineage type: %s", type));
+ private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.]");
+
+ /**
+ * Wrap segment to valid segment name.
+ *
+ *
Specifically, If there are reserved chars (colon, whitespace, dot), escape with backtick. If
+ * the segment is already wrapped, return the original.
+ */
+ private static String wrapSegment(String value) {
+ if (value.startsWith("`") && value.endsWith("`")) {
+ return value;
+ }
+ if (RESERVED_CHARS.matcher(value).find()) {
+ return String.format("`%s`", value);
+ }
+ return value;
+ }
+
+ /**
+ * Assemble fully qualified name (FQN). Format:
+ *
+ *
+ * - {@code system:segment1.segment2}
+ *
- {@code system:routine:segment1.segment2}
+ *
- {@code system:`segment1.with.dots:clons`.segment2}
+ *
+ *
+ * This helper method is for internal and testing usage only.
+ */
+ @Internal
+ public static String getFqName(
+ String system, @Nullable String routine, Iterable segments) {
+ StringBuilder builder = new StringBuilder(system);
+ if (!Strings.isNullOrEmpty(routine)) {
+ builder.append(":").append(routine);
+ }
+ int idx = 0;
+ for (String segment : segments) {
+ if (idx == 0) {
+ builder.append(":");
+ } else {
+ builder.append(".");
+ }
+ builder.append(wrapSegment(segment));
+ ++idx;
}
+ return builder.toString();
+ }
+
+ /**
+ * Assemble the FQN of given system, and segments.
+ *
+ * This helper method is for internal and testing usage only.
+ */
+ @Internal
+ public static String getFqName(String system, Iterable segments) {
+ return getFqName(system, null, segments);
+ }
+
+ /**
+ * Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link #getFqName}.
+ */
+ public void add(String system, @Nullable String routine, Iterable segments) {
+ metric.add(getFqName(system, routine, segments));
+ }
+
+ /**
+ * Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link #getFqName}.
+ */
+ public void add(String system, Iterable segments) {
+ add(system, null, segments);
}
/** Query {@link StringSet} metrics from {@link MetricResults}. */
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java
new file mode 100644
index 000000000000..432eb396fe20
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Lineage}. */
+@RunWith(JUnit4.class)
+public class LineageTest {
+ @Test
+ public void testGetFqName() {
+ Map testCases =
+ ImmutableMap.builder()
+ .put("apache-beam", "apache-beam")
+ .put("`apache-beam`", "`apache-beam`")
+ .put("apache.beam", "`apache.beam`")
+ .put("apache:beam", "`apache:beam`")
+ .put("apache beam", "`apache beam`")
+ .put("`apache beam`", "`apache beam`")
+ .put("apache\tbeam", "`apache\tbeam`")
+ .put("apache\nbeam", "`apache\nbeam`")
+ .build();
+ testCases.forEach(
+ (key, value) ->
+ assertEquals("apache:" + value, Lineage.getFqName("apache", ImmutableList.of(key))));
+ testCases.forEach(
+ (key, value) ->
+ assertEquals(
+ "apache:beam:" + value,
+ Lineage.getFqName("apache", "beam", ImmutableList.of(key))));
+ testCases.forEach(
+ (key, value) ->
+ assertEquals(
+ "apache:beam:" + value + "." + value,
+ Lineage.getFqName("apache", "beam", ImmutableList.of(key, key))));
+ }
+}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
index 596b20beb83d..6332051c0ddc 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -216,10 +216,10 @@ protected String getScheme() {
}
@Override
- protected void reportLineage(GcsResourceId resourceId, Lineage.Type type) {
+ protected void reportLineage(GcsResourceId resourceId, Lineage lineage) {
GcsPath path = resourceId.getGcsPath();
if (!path.getBucket().isEmpty()) {
- Lineage.get(type).add(String.format("gcs:%s.%s", path.getBucket(), path.getObject()));
+ lineage.add("gcs", ImmutableList.of(path.getBucket(), path.getObject()));
} else {
LOG.warn("Report Lineage on relative path {} is unsupported", path.getObject());
}
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
index 9b234865eb91..7ed56efa44bd 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
@@ -626,8 +626,8 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir
}
@Override
- protected void reportLineage(S3ResourceId resourceId, Lineage.Type type) {
- Lineage.get(type).add(String.format("s3:%s.%s", resourceId.getBucket(), resourceId.getKey()));
+ protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
+ lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey()));
}
/**
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
index cf47133e98ea..384c8c627ee7 100644
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
@@ -657,8 +657,8 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir
}
@Override
- protected void reportLineage(S3ResourceId resourceId, Lineage.Type type) {
- Lineage.get(type).add(String.format("s3:%s.%s", resourceId.getBucket(), resourceId.getKey()));
+ protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
+ lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey()));
}
/**
diff --git a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
index 8a881cead7f1..5137eaf9bb2d 100644
--- a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
+++ b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
@@ -452,16 +452,14 @@ protected AzfsResourceId matchNewResource(String singleResourceSpec, boolean isD
}
@Override
- protected void reportLineage(AzfsResourceId resourceId, Lineage.Type type) {
+ protected void reportLineage(AzfsResourceId resourceId, Lineage lineage) {
if (!Strings.isNullOrEmpty(resourceId.getBlob())) {
- Lineage.get(type)
- .add(
- String.format(
- "abs:%s.%s.%s",
- resourceId.getAccount(), resourceId.getContainer(), resourceId.getBlob()));
+ lineage.add(
+ "abs",
+ ImmutableList.of(
+ resourceId.getAccount(), resourceId.getContainer(), resourceId.getBlob()));
} else {
- Lineage.get(type)
- .add(String.format("abs:%s.%s", resourceId.getAccount(), resourceId.getContainer()));
+ lineage.add("abs", ImmutableList.of(resourceId.getAccount(), resourceId.getContainer()));
}
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 61bed66a3365..129c8314fc80 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -55,6 +55,7 @@
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -413,7 +414,7 @@ public static String toTableSpec(TableReference ref) {
return sb.toString();
}
- public static String dataCatalogName(TableReference ref, BigQueryOptions options) {
+ public static List dataCatalogSegments(TableReference ref, BigQueryOptions options) {
String tableIdBase;
int ix = ref.getTableId().indexOf('$');
if (ix == -1) {
@@ -429,7 +430,7 @@ public static String dataCatalogName(TableReference ref, BigQueryOptions options
} else {
projectId = options.getProject();
}
- return String.format("bigquery:%s.%s.%s", projectId, ref.getDatasetId(), tableIdBase);
+ return ImmutableList.of(projectId, ref.getDatasetId(), tableIdBase);
}
static List getOrCreateMapListValue(Map> map, K key) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 998c82ab8d83..a8985775cbe7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -121,7 +121,8 @@ protected ExtractResult extractFiles(PipelineOptions options) throws Exception {
BigQueryHelpers.toTableSpec(tableToExtract)));
}
// emit this table ID as a lineage source
- Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract, bqOptions));
+ Lineage.getSources()
+ .add("bigquery", BigQueryHelpers.dataCatalogSegments(tableToExtract, bqOptions));
TableSchema schema = table.getSchema();
JobService jobService = bqServices.getJobService(bqOptions);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index 3852d18ec12d..51a5a8f391a6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -35,7 +35,6 @@
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
import org.apache.beam.sdk.metrics.Lineage;
-import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -109,12 +108,12 @@ public List> split(
@Nullable Table targetTable = getTargetTable(bqOptions);
ReadSession.Builder readSessionBuilder = ReadSession.newBuilder();
- StringSet lineageSources = Lineage.getSources();
+ Lineage lineage = Lineage.getSources();
if (targetTable != null) {
TableReference tableReference = targetTable.getTableReference();
readSessionBuilder.setTable(BigQueryHelpers.toTableResourceName(tableReference));
// register the table as lineage source
- lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, bqOptions));
+ lineage.add("bigquery", BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions));
} else {
// If the table does not exist targetTable will be null.
// Construct the table id if we can generate it. For error recording/logging.
@@ -123,7 +122,7 @@ public List> split(
readSessionBuilder.setTable(tableReferenceId);
// register the table as lineage source
TableReference tableReference = BigQueryHelpers.parseTableUrn(tableReferenceId);
- lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, bqOptions));
+ lineage.add("bigquery", BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions));
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index a55bcc3fe025..1bbd4e756084 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -121,7 +121,8 @@ public void processElement(ProcessContext context) {
BigQueryOptions bqOptions = context.getPipelineOptions().as(BigQueryOptions.class);
Lineage.getSinks()
.add(
- BigQueryHelpers.dataCatalogName(
+ "bigquery",
+ BigQueryHelpers.dataCatalogSegments(
tableDestination1.getTableReference(), bqOptions));
return CreateTableHelpers.possiblyCreateTable(
bqOptions,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 8a902ec6d264..0671c6da3c61 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -1109,7 +1109,8 @@ public void process(
pipelineOptions.as(BigQueryOptions.class)));
Lineage.getSinks()
.add(
- BigQueryHelpers.dataCatalogName(
+ "bigquery",
+ BigQueryHelpers.dataCatalogSegments(
state.getTableDestination().getTableReference(),
pipelineOptions.as(BigQueryOptions.class)));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index a7da19a75f85..05708f3e6abe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -472,7 +472,8 @@ public void process(
Lineage.getSinks()
.add(
- BigQueryHelpers.dataCatalogName(
+ "bigquery",
+ BigQueryHelpers.dataCatalogSegments(
tableDestination.getTableReference(), bigQueryOptions));
Coder destinationCoder = dynamicDestinations.getDestinationCoder();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 1a6a6a4db70d..061e66024e29 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -210,7 +210,8 @@ public void processElement(
if (!entry.getValue().isEmpty()) {
Lineage.getSinks()
.add(
- BigQueryHelpers.dataCatalogName(
+ "bigquery",
+ BigQueryHelpers.dataCatalogSegments(
entry.getKey().getTableReference(),
c.getPipelineOptions().as(BigQueryOptions.class)));
pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c, window));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index ace0bc5a74cd..e374d459af44 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -263,7 +263,8 @@ public void processElement(
} else {
Lineage.getSinks()
.add(
- BigQueryHelpers.dataCatalogName(
+ "bigquery",
+ BigQueryHelpers.dataCatalogSegments(
tableReference, c.getPipelineOptions().as(BigQueryOptions.class)));
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 6fdf67722bac..1af9ae4f932d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -216,7 +216,7 @@ public void close() {
@Override
public void reportLineage() {
- Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId));
+ Lineage.getSources().add("bigtable", ImmutableList.of(projectId, instanceId, tableId));
}
}
@@ -327,7 +327,7 @@ public void close() {}
@Override
public void reportLineage() {
- Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId));
+ Lineage.getSources().add("bigtable", ImmutableList.of(projectId, instanceId, tableId));
}
@Override
@@ -597,7 +597,7 @@ public void writeSingleRecord(KV> record) throws
@Override
public void reportLineage() {
- Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId));
+ Lineage.getSinks().add("bigtable", ImmutableList.of(projectId, instanceId, tableId));
}
private ServiceCallMetric createServiceCallMetric() {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index f66ee6e1d842..2964a29dbb6b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -37,6 +37,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
@@ -260,8 +261,8 @@ public String getFullPath() {
return String.format("/subscriptions/%s/%s", projectId, subscriptionName);
}
- public String getDataCatalogName() {
- return String.format("pubsub:subscription:%s.%s", projectId, subscriptionName);
+ public List getDataCatalogSegments() {
+ return ImmutableList.of(projectId, subscriptionName);
}
@Override
@@ -319,14 +320,14 @@ public String getName() {
}
/**
- * Returns the data catalog name. Format "pubsub:topic:`project`.`topic`" This method is
- * fail-safe. If topic path is malformed, it returns an empty string.
+ * Returns the data catalog segments. This method is fail-safe. If topic path is malformed, it
+ * returns an empty string.
*/
- public String getDataCatalogName() {
+ public List getDataCatalogSegments() {
List splits = Splitter.on('/').splitToList(path);
if (splits.size() == 4) {
// well-formed path
- return String.format("pubsub:topic:%s.%s", splits.get(1), splits.get(3));
+ return ImmutableList.of(splits.get(1), splits.get(3));
} else {
// Mal-formed path. It is either a test fixture or user error and will fail on publish.
// We do not throw exception instead return empty string here.
@@ -334,7 +335,7 @@ public String getDataCatalogName() {
"Cannot get data catalog name for malformed topic path {}. Expected format: "
+ "projects//topics/",
path);
- return "";
+ return ImmutableList.of();
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 0fd4e9207d81..8b582c1054f8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -85,6 +85,7 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
@@ -513,8 +514,8 @@ public String asPath() {
}
}
- public String dataCatalogName() {
- return String.format("pubsub:topic:%s.%s", project, topic);
+ public List dataCatalogSegments() {
+ return ImmutableList.of(project, topic);
}
@Override
@@ -1624,7 +1625,7 @@ public void finishBundle() throws IOException {
}
// Report lineage for all topics seen
for (PubsubTopic topic : output.keySet()) {
- Lineage.getSinks().add(topic.dataCatalogName());
+ Lineage.getSinks().add("pubsub", "topic", topic.dataCatalogSegments());
}
output = null;
pubsubClient.close();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index defea87e835a..38d77aa3aac3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -70,7 +70,6 @@
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
@@ -297,9 +296,9 @@ private void publishBatch(List messages, int bytes) throws IOEx
byteCounter.inc(bytes);
// Report Lineage multiple once for same topic
if (!topicPath.equals(reportedLineage)) {
- String name = topicPath.getDataCatalogName();
- if (!Strings.isNullOrEmpty(name)) {
- Lineage.getSinks().add(topicPath.getDataCatalogName());
+ List segments = topicPath.getDataCatalogSegments();
+ if (segments.size() != 0) {
+ Lineage.getSinks().add("pubsub", "topic", segments);
}
reportedLineage = topicPath;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index b131b521c067..95fa5c223419 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1045,14 +1045,14 @@ public List split(int desiredNumSplits, PipelineOptions options)
TopicPath topic = outer.getTopic();
if (topic != null) {
// is initial split on Read.fromTopic, report Lineage based on topic
- Lineage.getSources().add(topic.getDataCatalogName());
+ Lineage.getSources().add("pubsub", "source", topic.getDataCatalogSegments());
}
} else {
if (subscriptionPath.equals(outer.getSubscriptionProvider())) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
// is a split on Read.fromSubscription
- Lineage.getSources().add(sub.getDataCatalogName());
+ Lineage.getSources().add("pubsub", "subscription", sub.getDataCatalogSegments());
}
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
index 4ce9ad10b2c0..4faff5ad6bd2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -39,6 +39,7 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -154,7 +155,9 @@ private void checkLineageSourceMetric(PipelineResult r, String tableId) {
if (options.getRunner().getName().contains("DirectRunner")) {
assertThat(
Lineage.query(r.metrics(), Lineage.Type.SOURCE),
- hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId)));
+ hasItem(
+ Lineage.getFqName(
+ "bigtable", ImmutableList.of(project, options.getInstanceId(), tableId))));
}
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index 46bb3df836e5..44f3e5a19923 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -423,7 +423,9 @@ private void checkLineageSinkMetric(PipelineResult r, String tableId) {
if (options.getRunner().getName().contains("DirectRunner")) {
assertThat(
Lineage.query(r.metrics(), Lineage.Type.SINK),
- hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId)));
+ hasItem(
+ Lineage.getFqName(
+ "bigtable", ImmutableList.of(project, options.getInstanceId(), tableId))));
}
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
index fb007d1171db..9d7bc65f5954 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
@@ -27,6 +27,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
import org.junit.Rule;
@@ -171,7 +172,7 @@ public void subscriptionPathFromNameWellFormed() {
SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something");
assertEquals("projects/test/subscriptions/something", path.getPath());
assertEquals("/subscriptions/test/something", path.getFullPath());
- assertEquals("pubsub:subscription:test.something", path.getDataCatalogName());
+ assertEquals(ImmutableList.of("test", "something"), path.getDataCatalogSegments());
}
@Test
@@ -179,7 +180,7 @@ public void topicPathFromNameWellFormed() {
TopicPath path = PubsubClient.topicPathFromName("test", "something");
assertEquals("projects/test/topics/something", path.getPath());
assertEquals("/topics/test/something", path.getFullPath());
- assertEquals("pubsub:topic:test.something", path.getDataCatalogName());
+ assertEquals(ImmutableList.of("test", "something"), path.getDataCatalogSegments());
}
@Test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 74a98f0b8b43..d4effbae40a4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -238,8 +238,8 @@ public void testValueProviderTopic() {
assertThat(pubsubRead.getTopicProvider().isAccessible(), is(true));
assertThat(pubsubRead.getTopicProvider().get().asPath(), equalTo(provider.get()));
assertThat(
- pubsubRead.getTopicProvider().get().dataCatalogName(),
- equalTo("pubsub:topic:project.topic"));
+ pubsubRead.getTopicProvider().get().dataCatalogSegments(),
+ equalTo(ImmutableList.of("project", "topic")));
}
@Test