Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Lineage metrics to Python PubsubIO, BigtableIO, FileIO #32430

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/io/aws/s3filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,11 @@ def delete(self, paths):
}
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage):
try:
components = s3io.parse_s3_path(path, get_account=True)
except ValueError:
# report lineage is fail-safe
return
lineage.add('s3', *components)
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,3 +316,11 @@ def delete(self, paths):

if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage):
try:
components = blobstorageio.parse_azfs_path(path, get_account=True)
except ValueError:
# report lineage is fail-safe
return
lineage.add('abs', *components)
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def _check_state_for_finalize_write(self, writer_results, num_shards):

src_files.append(src)
dst_files.append(dst)
FileSystems.report_sink_lineage(dst)
return src_files, dst_files, delete_files, num_skipped

@check_accessible(['file_path_prefix'])
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/filebasedsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def _get_concat_source(self) -> concat_source.ConcatSource:
min_bundle_size=self._min_bundle_size,
splittable=splittable)
single_file_sources.append(single_file_source)
FileSystems.report_source_lineage(file_name)
self._concat_source = concat_source.ConcatSource(single_file_sources)
return self._concat_source

Expand Down Expand Up @@ -351,6 +352,7 @@ def process(self, element: Union[str, FileMetadata], *args,
match_results = FileSystems.match([element])
metadata_list = match_results[0].metadata_list
for metadata in metadata_list:
FileSystems.report_source_lineage(metadata.path)
splittable = (
self._splittable and _determine_splittability_from_compression_type(
metadata.path, self._compression_type))
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,3 +933,11 @@ def delete(self, paths):
``BeamIOError``: if any of the delete operations fail
"""
raise NotImplementedError

def report_lineage(self, path, unused_lineage):
"""
Report Lineage metrics for path.

Unless override by FileSystem implementations, default to no-op.
"""
pass
13 changes: 13 additions & 0 deletions sdks/python/apache_beam/io/filesystems.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileSystem
from apache_beam.metrics.metric import Lineage
from apache_beam.options.value_provider import RuntimeValueProvider

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -388,3 +389,15 @@ def get_chunk_size(path):
"""
filesystem = FileSystems.get_filesystem(path)
return filesystem.CHUNK_SIZE

@staticmethod
def report_source_lineage(path):
"""Report source :class:`~apache_beam.metrics.metric.Lineage`."""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sources())

@staticmethod
def report_sink_lineage(path):
"""Report sink :class:`~apache_beam.metrics.metric.Lineage`."""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sinks())
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import Metrics
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.metric import Lineage
from apache_beam.transforms import PTransform
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.external import BeamJarExpansionService
Expand Down Expand Up @@ -162,6 +163,12 @@ def finish_bundle(self):
if self.batcher:
self.batcher.close()
self.batcher = None
# Report Lineage metrics on write
Lineage.sinks().add(
'bigtable',
self.beam_options['project_id'],
self.beam_options['instance_id'],
self.beam_options['table_id'])

def display_data(self):
return {
Expand Down
15 changes: 15 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigtableio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metric import Lineage
from apache_beam.testing.test_pipeline import TestPipeline

_LOGGER = logging.getLogger(__name__)

# Protect against environments where bigtable library is not available.
try:
from google.cloud.bigtable import client
from google.cloud.bigtable.batcher import MutationsBatcher
from google.cloud.bigtable.row_filters import TimestampRange
from google.cloud.bigtable.instance import Instance
from google.cloud.bigtable.row import DirectRow, PartialRowData, Cell
Expand Down Expand Up @@ -266,6 +269,18 @@ def setUp(self):
instance = Instance(self._INSTANCE_ID, client)
self.table = Table(self._TABLE_ID, instance)

def test_write(self):
direct_rows = [self.generate_row(i) for i in range(5)]
with patch.object(MutationsBatcher, 'mutate'), \
patch.object(MutationsBatcher, 'close'), TestPipeline() as p:
_ = p | beam.Create(direct_rows) | bigtableio.WriteToBigTable(
self._PROJECT_ID, self._INSTANCE_ID, self._TABLE_ID)
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set([
f"bigtable:{self._PROJECT_ID}.{self._INSTANCE_ID}.{self._TABLE_ID}"
]))

def test_write_metrics(self):
MetricsEnvironment.process_wide_container().reset()
write_fn = bigtableio._BigTableWriteFn(
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,11 @@ def delete(self, paths):

if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage):
try:
bucket, blob = gcsio.parse_gcs_path(path)
except ValueError:
# report lineage is fail-safe
return
lineage.add('gcs', bucket, blob)
67 changes: 65 additions & 2 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@
from apache_beam.io import iobase
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Write
from apache_beam.metrics.metric import Lineage
from apache_beam.transforms import DoFn
from apache_beam.transforms import Flatten
from apache_beam.transforms import Map
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.utils.annotations import deprecated
Expand Down Expand Up @@ -257,7 +260,16 @@ def __init__(
def expand(self, pvalue):
# TODO(BEAM-27443): Apply a proper transform rather than Read.
pcoll = pvalue.pipeline | Read(self._source)
# explicit element_type required after native read, otherwise coder error
pcoll.element_type = bytes
return self.expand_continued(pcoll)

def expand_continued(self, pcoll):
pcoll = pcoll | ParDo(
_AddMetricsPassThrough(
project=self._source.project,
topic=self._source.topic_name,
sub=self._source.subscription_name)).with_output_types(bytes)
if self.with_attributes:
pcoll = pcoll | Map(PubsubMessage._from_proto_str)
pcoll.element_type = PubsubMessage
Expand All @@ -269,6 +281,31 @@ def to_runner_api_parameter(self, context):
return self.to_runner_api_pickled(context)


class _AddMetricsPassThrough(DoFn):
def __init__(self, project, topic=None, sub=None):
self.project = project
self.topic = topic
self.sub = sub
self.reported_lineage = False

def setup(self):
self.reported_lineage = False

def process(self, element: bytes):
self.report_lineage_once()
yield element

def report_lineage_once(self):
if not self.reported_lineage:
self.reported_lineage = True
if self.topic is not None:
Lineage.sources().add(
'pubsub', self.project, self.topic, subtype='topic')
elif self.sub is not None:
Lineage.sources().add(
'pubsub', self.project, self.sub, subtype='subscription')


@deprecated(since='2.7.0', extra_message='Use ReadFromPubSub instead.')
def ReadStringsFromPubSub(topic=None, subscription=None, id_label=None):
return _ReadStringsFromPubSub(topic, subscription, id_label)
Expand Down Expand Up @@ -314,6 +351,26 @@ def expand(self, pcoll):
return pcoll | WriteToPubSub(self.topic)


class _AddMetricsAndMap(DoFn):
def __init__(self, fn, project, topic=None):
self.project = project
self.topic = topic
self.fn = fn
self.reported_lineage = False

def setup(self):
self.reported_lineage = False

def process(self, element):
self.report_lineage_once()
yield self.fn(element)

def report_lineage_once(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to ensure read/write on report_lineage is atomic? In other words, is it possible that multiple threads read and write reported_lineage at the same time leading to a race condition?

Copy link
Contributor Author

@Abacn Abacn Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not matter because Lineage is backed by a String Set, report once or multiple times has the same result (idempotent). Here I use a local variable to reduce the overhead a little bit (do not do set add operation on every element)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pretty much mirrors Java SDK change of #32068 (BigtableIO), #32090 (FileIO), #32381 (PubSubIO, final version after feedbacks)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i see.

if not self.reported_lineage:
self.reported_lineage = True
Lineage.sinks().add('pubsub', self.project, self.topic, subtype='topic')


class WriteToPubSub(PTransform):
"""A ``PTransform`` for writing messages to Cloud Pub/Sub."""

Expand Down Expand Up @@ -364,9 +421,15 @@ def bytes_to_proto_str(element: Union[bytes, str]) -> bytes:

def expand(self, pcoll):
if self.with_attributes:
pcoll = pcoll | 'ToProtobufX' >> Map(self.message_to_proto_str)
pcoll = pcoll | 'ToProtobufX' >> ParDo(
_AddMetricsAndMap(
self.message_to_proto_str, self.project,
self.topic_name)).with_input_types(PubsubMessage)
else:
pcoll = pcoll | 'ToProtobufY' >> Map(self.bytes_to_proto_str)
pcoll = pcoll | 'ToProtobufY' >> ParDo(
_AddMetricsAndMap(
self.bytes_to_proto_str, self.project,
self.topic_name)).with_input_types(Union[bytes, str])
pcoll.element_type = bytes
return pcoll | Write(self._sink)

Expand Down
57 changes: 57 additions & 0 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from apache_beam.io.gcp.pubsub import WriteToPubSub
from apache_beam.io.gcp.pubsub import _PubSubSink
from apache_beam.io.gcp.pubsub import _PubSubSource
from apache_beam.metrics.metric import Lineage
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.portability import common_urns
Expand Down Expand Up @@ -819,6 +820,30 @@ def test_runner_api_transformation_with_subscription(
'projects/fakeprj/subscriptions/a_subscription',
transform_from_proto.source.full_subscription)

def test_read_from_pubsub_no_overwrite(self, unused_mock):
expected_elements = [
TestWindowedValue(
b'apache',
timestamp.Timestamp(1520861826.234567), [window.GlobalWindow()]),
TestWindowedValue(
b'beam',
timestamp.Timestamp(1520861824.234567), [window.GlobalWindow()])
]
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
for test_case in ('topic', 'subscription'):
with TestPipeline(options=options) as p:
# Direct runner currently overwrites the whole ReadFromPubSub transform.
# This test part of composite transform without overwrite.
pcoll = p | beam.Create([b'apache', b'beam']) | beam.Map(
lambda x: window.TimestampedValue(x, 1520861820.234567 + len(x)))
args = {test_case: f'projects/fakeprj/{test_case}s/topic_or_sub'}
pcoll = ReadFromPubSub(**args).expand_continued(pcoll)
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SOURCE),
set([f"pubsub:{test_case}:fakeprj.topic_or_sub"]))


@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@mock.patch('google.cloud.pubsub.PublisherClient')
Expand Down Expand Up @@ -974,6 +999,38 @@ def test_runner_api_transformation_properties_none(self, unused_mock_pubsub):
self.assertIsNone(transform_from_proto.sink.id_label)
self.assertIsNone(transform_from_proto.sink.timestamp_attribute)

def test_write_to_pubsub_no_overwrite(self, unused_mock):
data = 'data'
payloads = [data]

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
pcoll = p | Create(payloads)
WriteToPubSub(
'projects/fakeprj/topics/a_topic',
with_attributes=False).expand(pcoll)
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["pubsub:topic:fakeprj.a_topic"]))

def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock):
data = b'data'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)]

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
pcoll = p | Create(payloads)
# Avoid direct runner overwrites WriteToPubSub
WriteToPubSub(
'projects/fakeprj/topics/a_topic',
with_attributes=True).expand(pcoll)
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["pubsub:topic:fakeprj.a_topic"]))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading