Skip to content

Commit

Permalink
Update streaming_load_jobs_bigquery.py (#43)
Browse files Browse the repository at this point in the history
Signed-off-by: Jeffrey Kinard <jeff@thekinards.com>
  • Loading branch information
Polber authored Jan 25, 2024
1 parent fe9c81c commit a7719f3
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions Python/bigquery/streaming_load_jobs_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,15 @@
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions

INPUT_TOPIC = "projects/pubsub-public-data/topics/taxirides-realtime"


def make_row(element):
"""
The input elements are passed as a dictionary.
Select fields 'ride_status', 'passenger_count',
'meter_reading' and 'timestamp' from elements.
"""
return {
"ride_status": element["ride_status"],
"passenger_count": element["passenger_count"],
Expand All @@ -34,25 +41,29 @@ def make_row(element):


def run(argv=None):
topic = "projects/pubsub-public-data/topics/taxirides-realtime"

class StreamingLoadOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument("--output_table", help="BQ Table to write")

parser.add_argument(
"--output_table", help="BQ Table to write", required=True
)
parser.add_argument(
"--input_topic",
help='Input PubSub topic of the form "projects/<PROJECT>/topics/<TOPIC>."', # noqa:E501
default=INPUT_TOPIC,
)
parser.add_argument(
"--triggering_frequency",
default=10,
help="Frequency to trigger the load job",
)

table_schema = "ride_status:STRING, passenger_count:INTEGER, meter_reading:FLOAT, timestamp:STRING" # noqa:E501
options = StreamingLoadOptions()
options = StreamingLoadOptions(streaming=True)
with beam.Pipeline(options=options) as p:
output = (
p
| "ReadFromPubSub" >> ReadFromPubSub(topic=topic)
| "ReadFromPubSub" >> ReadFromPubSub(topic=options.input_topic)
| "Load Json" >> Map(json.loads)
| "Select Fields" >> Map(make_row)
| WriteToBigQuery(
Expand Down

0 comments on commit a7719f3

Please sign in to comment.