From a7719f386e93485d9c896eb4ed61273d2eb4b0e5 Mon Sep 17 00:00:00 2001 From: Jeff Kinard Date: Thu, 25 Jan 2024 12:57:31 -0500 Subject: [PATCH] Update streaming_load_jobs_bigquery.py (#43) Signed-off-by: Jeffrey Kinard --- .../bigquery/streaming_load_jobs_bigquery.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/Python/bigquery/streaming_load_jobs_bigquery.py b/Python/bigquery/streaming_load_jobs_bigquery.py index 1c24808..86bde2a 100644 --- a/Python/bigquery/streaming_load_jobs_bigquery.py +++ b/Python/bigquery/streaming_load_jobs_bigquery.py @@ -23,8 +23,15 @@ from apache_beam.io.gcp.pubsub import ReadFromPubSub from apache_beam.options.pipeline_options import PipelineOptions +INPUT_TOPIC = "projects/pubsub-public-data/topics/taxirides-realtime" + def make_row(element): + """ + The input elements are passed as a dictionary. + Select fields 'ride_status', 'passenger_count', + 'meter_reading' and 'timestamp' from elements. + """ return { "ride_status": element["ride_status"], "passenger_count": element["passenger_count"], @@ -34,13 +41,17 @@ def make_row(element): def run(argv=None): - topic = "projects/pubsub-public-data/topics/taxirides-realtime" - class StreamingLoadOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): - parser.add_argument("--output_table", help="BQ Table to write") - + parser.add_argument( + "--output_table", help="BQ Table to write", required=True + ) + parser.add_argument( + "--input_topic", + help='Input PubSub topic of the form "projects//topics/."', # noqa:E501 + default=INPUT_TOPIC, + ) parser.add_argument( "--triggering_frequency", default=10, @@ -48,11 +59,11 @@ def _add_argparse_args(cls, parser): ) table_schema = "ride_status:STRING, passenger_count:INTEGER, meter_reading:FLOAT, timestamp:STRING" # noqa:E501 - options = StreamingLoadOptions() + options = StreamingLoadOptions(streaming=True) with beam.Pipeline(options=options) as p: output = ( p - | "ReadFromPubSub" >> ReadFromPubSub(topic=topic) + | "ReadFromPubSub" >> ReadFromPubSub(topic=options.input_topic) | "Load Json" >> Map(json.loads) | "Select Fields" >> Map(make_row) | WriteToBigQuery(