-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Lineage metrics to Python PubsubIO, BigtableIO, FileIO #32430
Conversation
6bc0b47
to
777d644
Compare
For PubsubIO and BigtableIO, unit test added. For FileIO, because it needs real gcs, and Dataflow runner currently does not return metrics back, tested locally with the following pipeline with beam.Pipeline("DirectRunner", options) as p:
p_read = p | ReadFromText(file_pattern='gs://dataflow-samples/shakespeare/kinghenry*') \
| WriteToText(file_path_prefix='gs://clouddfe-yihu-test/tmp/fileiolineage')
print(Lineage.query(p.result.metrics(), Lineage.SOURCE))
print(Lineage.query(p.result.metrics(), Lineage.SINK)) see output: {'gcs:dataflow-samples.`shakespeare/kinghenryviii.txt`', 'gcs:dataflow-samples.`shakespeare/kinghenryv.txt`'}
{'gcs:clouddfe-yihu-test.tmp/fileiolineage-00000-of-00001'} |
Assigning reviewers. If you would like to opt out of this review, comment R: @shunping for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
self.report_lineage_once() | ||
yield self.fn(element) | ||
|
||
def report_lineage_once(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to ensure read/write on report_lineage is atomic? In other words, is it possible that multiple threads read and write reported_lineage at the same time leading to a race condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not matter because Lineage is backed by a String Set, report once or multiple times has the same result (idempotent). Here I use a local variable to reduce the overhead a little bit (do not do set add operation on every element)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i see.
python ml precommit and python test were breakages on base branch and fixed on latest master now, not related to this change, and merging for now |
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.