Skip to content

Commit

Permalink
Add Lineage metrics to FileSystems
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Aug 6, 2024
1 parent e3e4454 commit cfd3e06
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ protected final List<KV<FileResult<DestinationT>, ResourceId>> finalizeDestinati
distinctFilenames.get(finalFilename));
distinctFilenames.put(finalFilename, result);
outputFilenames.add(KV.of(result, finalFilename));
FileSystems.reportSinkLineage(finalFilename);
}
return outputFilenames;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ public final List<? extends FileBasedSource<T>> split(
splitResults.size());
return splitResults;
} else {
FileSystems.reportSourceLineage(getSingleFileMetadata().resourceId());
if (isSplittable()) {
@SuppressWarnings("unchecked")
List<FileBasedSource<T>> splits =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.metrics.Lineage;

/**
* File system interface in Beam.
Expand Down Expand Up @@ -155,4 +156,11 @@ protected abstract void rename(
* @see <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
*/
protected abstract String getScheme();

/**
* Report {@link Lineage} metrics for resource id.
*
* <p>Unless override by FileSystem implementations, default to no-op.
*/
protected void reportLineage(ResourceIdT unusedId, Lineage.Type unusedType) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -395,6 +396,16 @@ public ResourceId apply(@Nonnull Metadata input) {
.delete(resourceIdsToDelete);
}

/** Report source {@link Lineage} metrics for resource id. */
public static void reportSourceLineage(ResourceId resourceId) {
getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.Type.SOURCE);
}

/** Report sink {@link Lineage} metrics for resource id. */
public static void reportSinkLineage(ResourceId resourceId) {
getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.Type.SINK);
}

private static class FilterResult {
public List<ResourceId> resultSources = new ArrayList();
public List<ResourceId> resultDestinations = new ArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -123,8 +124,9 @@ protected abstract T makeOutput(
public void process(ProcessContext c) throws IOException {
FileIO.ReadableFile file = c.element().getKey();
OffsetRange range = c.element().getValue();
ResourceId resourceId = file.getMetadata().resourceId();
FileBasedSource<InT> source =
CompressedSource.from(createSource.apply(file.getMetadata().resourceId().toString()))
CompressedSource.from(createSource.apply(resourceId.toString()))
.withCompression(file.getCompression());
try (BoundedSource.BoundedReader<InT> reader =
source
Expand All @@ -138,6 +140,7 @@ public void process(ProcessContext c) throws IOException {
throw e;
}
}
FileSystems.reportSourceLineage(resourceId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ public static StringSet getSinks() {
return SINKS;
}

/** {@link StringSet} representing {@link Type}. */
public static StringSet get(Type type) {
switch (type) {
case SOURCE:
return getSources();
case SINK:
return getSinks();
default:
throw new IllegalArgumentException(String.format("Unsupported Lineage type: %s", type));
}
}

/** Query {@link StringSet} metrics from {@link MetricResults}. */
public static Set<String> query(MetricResults results, Type type) {
MetricsFilter filter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
Expand Down Expand Up @@ -214,6 +215,16 @@ protected String getScheme() {
return "gs";
}

@Override
protected void reportLineage(GcsResourceId resourceId, Lineage.Type type) {
GcsPath path = resourceId.getGcsPath();
if (!path.getBucket().isEmpty()) {
Lineage.get(type).add(String.format("gcs:%s.%s", path.getBucket(), path.getObject()));
} else {
LOG.warn("Report Lineage on relative path {} is unsupported", path.getObject());
}
}

private List<MatchResult> matchGlobs(List<GcsPath> globs) {
// TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503.
return FluentIterable.from(globs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
Expand Down Expand Up @@ -624,6 +625,11 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir
return S3ResourceId.fromUri(singleResourceSpec);
}

@Override
protected void reportLineage(S3ResourceId resourceId, Lineage.Type type) {
Lineage.get(type).add(String.format("s3:%s.%s", resourceId.getBucket(), resourceId.getKey()));
}

/**
* Invokes tasks in a thread pool, then unwraps the resulting {@link Future Futures}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
Expand Down Expand Up @@ -655,6 +656,11 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir
return S3ResourceId.fromUri(singleResourceSpec);
}

@Override
protected void reportLineage(S3ResourceId resourceId, Lineage.Type type) {
Lineage.get(type).add(String.format("s3:%s.%s", resourceId.getBucket(), resourceId.getKey()));
}

/**
* Invokes tasks in a thread pool, then unwraps the resulting {@link Future Futures}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
Expand Down Expand Up @@ -449,4 +450,18 @@ protected AzfsResourceId matchNewResource(String singleResourceSpec, boolean isD
}
return AzfsResourceId.fromUri(singleResourceSpec);
}

@Override
protected void reportLineage(AzfsResourceId resourceId, Lineage.Type type) {
if (!Strings.isNullOrEmpty(resourceId.getBlob())) {
Lineage.get(type)
.add(
String.format(
"abs:%s.%s.%s",
resourceId.getAccount(), resourceId.getContainer(), resourceId.getBlob()));
} else {
Lineage.get(type)
.add(String.format("abs:%s.%s", resourceId.getAccount(), resourceId.getContainer()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import java.time.Instant;
Expand All @@ -36,6 +37,7 @@
import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
import org.apache.beam.sdk.io.common.FileBasedIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
Expand Down Expand Up @@ -152,6 +154,9 @@ public void writeThenReadAll() {

PipelineResult result = pipeline.run();
PipelineResult.State pipelineState = result.waitUntilFinish();
assertEquals(
Lineage.query(result.metrics(), Lineage.Type.SOURCE),
Lineage.query(result.metrics(), Lineage.Type.SINK));

collectAndPublishMetrics(result);
// Fail the test if pipeline failed.
Expand Down

0 comments on commit cfd3e06

Please sign in to comment.