Skip to content

Commit

Permalink
Move PubsubIO source Lineage report to MapElements (#32381)
Browse files Browse the repository at this point in the history
* Move PubsubIO source Lineage report to MapElements

* Fix route -> subtype
  • Loading branch information
Abacn committed Sep 9, 2024
1 parent 2a0ce8f commit 5fbfdba
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,18 @@ private static String wrapSegment(String value) {
*
* <ul>
* <li>{@code system:segment1.segment2}
* <li>{@code system:routine:segment1.segment2}
* <li>{@code system:subtype:segment1.segment2}
* <li>{@code system:`segment1.with.dots:clons`.segment2}
* </ul>
*
* <p>This helper method is for internal and testing usage only.
*/
@Internal
public static String getFqName(
String system, @Nullable String routine, Iterable<String> segments) {
String system, @Nullable String subtype, Iterable<String> segments) {
StringBuilder builder = new StringBuilder(system);
if (!Strings.isNullOrEmpty(routine)) {
builder.append(":").append(routine);
if (!Strings.isNullOrEmpty(subtype)) {
builder.append(":").append(subtype);
}
int idx = 0;
for (String segment : segments) {
Expand Down Expand Up @@ -111,8 +111,8 @@ public static String getFqName(String system, Iterable<String> segments) {
/**
* Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link #getFqName}.
*/
public void add(String system, @Nullable String routine, Iterable<String> segments) {
metric.add(getFqName(system, routine, segments));
public void add(String system, @Nullable String subtype, Iterable<String> segments) {
metric.add(getFqName(system, subtype, segments));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -1164,17 +1163,48 @@ private PCollection<T> expandReadContinued(
@Nullable ValueProvider<SubscriptionPath> subscriptionPath) {

TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
SerializableFunction<PubsubMessage, T> parseFnWrapped =
new SerializableFunction<PubsubMessage, T>() {
// flag that reported metrics
private final SerializableFunction<PubsubMessage, T> underlying =
Objects.requireNonNull(getParseFn());
private transient boolean reportedMetrics = false;

// public
@Override
public T apply(PubsubMessage input) {
if (!reportedMetrics) {
LOG.info("reportling lineage...");
// report Lineage once
if (topicPath != null) {
TopicPath topic = topicPath.get();
if (topic != null) {
Lineage.getSources().add("pubsub", "topic", topic.getDataCatalogSegments());
}
}
if (subscriptionPath != null) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
Lineage.getSources()
.add("pubsub", "subscription", sub.getDataCatalogSegments());
}
}
reportedMetrics = true;
}
return underlying.apply(input);
}
};
PCollection<T> read;
if (getDeadLetterTopicProvider() == null
&& (getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn()));
read = preParse.apply(MapElements.into(typeDescriptor).via(parseFnWrapped));
} else {
// parse PubSub messages, separating out exceptions
Result<PCollection<T>, KV<PubsubMessage, EncodableThrowable>> result =
preParse.apply(
"PubsubIO.Read/Map/Parse-Incoming-Messages",
MapElements.into(typeDescriptor)
.via(getParseFn())
.via(parseFnWrapped)
.exceptionsVia(new WithFailures.ThrowableHandler<PubsubMessage>() {}));

// Emit parsed records
Expand Down Expand Up @@ -1230,31 +1260,6 @@ private PCollection<T> expandReadContinued(
.withClientFactory(getPubsubClientFactory()));
}
}
// report Lineage once
preParse
.getPipeline()
.apply(Impulse.create())
.apply(
ParDo.of(
new DoFn<byte[], Void>() {
@ProcessElement
public void process() {
if (topicPath != null) {
TopicPath topic = topicPath.get();
if (topic != null) {
Lineage.getSources()
.add("pubsub", "topic", topic.getDataCatalogSegments());
}
}
if (subscriptionPath != null) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
Lineage.getSources()
.add("pubsub", "subscription", sub.getDataCatalogSegments());
}
}
}
}));
return read.setCoder(getCoder());
}

Expand Down
14 changes: 7 additions & 7 deletions sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,25 +352,25 @@ def wrap_segment(segment: str) -> str:

@staticmethod
def get_fq_name(
system: str, *segments: str, route: Optional[str] = None) -> str:
system: str, *segments: str, subtype: Optional[str] = None) -> str:
"""Assemble fully qualified name
(`FQN <https://cloud.google.com/data-catalog/docs/fully-qualified-names>`_).
Format:
- `system:segment1.segment2`
- `system:routine:segment1.segment2`
- `system:`segment1.with.dots:clons`.segment2`
- `system:subtype:segment1.segment2`
- `system:`segment1.with.dots:colons`.segment2`
This helper method is for internal and testing usage only.
"""
segs = '.'.join(map(Lineage.wrap_segment, segments))
if route:
return ':'.join((system, route, segs))
if subtype:
return ':'.join((system, subtype, segs))
return ':'.join((system, segs))

def add(
self, system: str, *segments: str, route: Optional[str] = None) -> None:
self.metric.add(self.get_fq_name(system, *segments, route=route))
self, system: str, *segments: str, subtype: Optional[str] = None) -> None:
self.metric.add(self.get_fq_name(system, *segments, subtype=subtype))

@staticmethod
def query(results: MetricResults, label: str) -> Set[str]:
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ def test_fq_name(self):
for k, v in test_cases.items():
self.assertEqual("apache:" + v, Lineage.get_fq_name("apache", k))
self.assertEqual(
"apache:beam:" + v, Lineage.get_fq_name("apache", k, route="beam"))
"apache:beam:" + v, Lineage.get_fq_name("apache", k, subtype="beam"))
self.assertEqual(
"apache:beam:" + v + '.' + v,
Lineage.get_fq_name("apache", k, k, route="beam"))
Lineage.get_fq_name("apache", k, k, subtype="beam"))


if __name__ == '__main__':
Expand Down

0 comments on commit 5fbfdba

Please sign in to comment.